001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapred; 020 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.fs.Path; 023import org.apache.hadoop.hbase.HBaseConfiguration; 024import org.apache.hadoop.hbase.TableName; 025import org.apache.yetus.audience.InterfaceAudience; 026import org.apache.hadoop.hbase.client.Connection; 027import org.apache.hadoop.hbase.client.ConnectionFactory; 028import org.apache.hadoop.hbase.client.Put; 029import org.apache.hadoop.hbase.client.RegionLocator; 030import org.apache.hadoop.hbase.client.Scan; 031import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 032import org.apache.hadoop.hbase.mapreduce.MutationSerialization; 033import org.apache.hadoop.hbase.mapreduce.ResultSerialization; 034import org.apache.hadoop.hbase.security.User; 035import org.apache.hadoop.hbase.security.UserProvider; 036import org.apache.hadoop.hbase.security.token.TokenUtil; 037import org.apache.hadoop.hbase.util.RegionSplitter; 038import org.apache.hadoop.mapred.FileInputFormat; 039import org.apache.hadoop.mapred.InputFormat; 040import org.apache.hadoop.mapred.JobConf; 041import org.apache.hadoop.mapred.OutputFormat; 042import org.apache.hadoop.mapred.TextInputFormat; 043import org.apache.hadoop.mapred.TextOutputFormat; 044 045import java.io.IOException; 046import java.util.Collection; 047import java.util.Map; 048 049/** 050 * Utility for {@link TableMap} and {@link TableReduce} 051 */ 052@InterfaceAudience.Public 053@SuppressWarnings({ "rawtypes", "unchecked" }) 054public class TableMapReduceUtil { 055 056 /** 057 * Use this before submitting a TableMap job. It will 058 * appropriately set up the JobConf. 059 * 060 * @param table The table name to read from. 061 * @param columns The columns to scan. 062 * @param mapper The mapper class to use. 063 * @param outputKeyClass The class of the output key. 064 * @param outputValueClass The class of the output value. 065 * @param job The current job configuration to adjust. 066 */ 067 public static void initTableMapJob(String table, String columns, 068 Class<? extends TableMap> mapper, 069 Class<?> outputKeyClass, 070 Class<?> outputValueClass, JobConf job) { 071 initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job, 072 true, TableInputFormat.class); 073 } 074 075 public static void initTableMapJob(String table, String columns, 076 Class<? extends TableMap> mapper, 077 Class<?> outputKeyClass, 078 Class<?> outputValueClass, JobConf job, boolean addDependencyJars) { 079 initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job, 080 addDependencyJars, TableInputFormat.class); 081 } 082 083 /** 084 * Use this before submitting a TableMap job. It will 085 * appropriately set up the JobConf. 086 * 087 * @param table The table name to read from. 088 * @param columns The columns to scan. 089 * @param mapper The mapper class to use. 090 * @param outputKeyClass The class of the output key. 091 * @param outputValueClass The class of the output value. 092 * @param job The current job configuration to adjust. 093 * @param addDependencyJars upload HBase jars and jars for any of the configured 094 * job classes via the distributed cache (tmpjars). 095 */ 096 public static void initTableMapJob(String table, String columns, 097 Class<? extends TableMap> mapper, 098 Class<?> outputKeyClass, 099 Class<?> outputValueClass, JobConf job, boolean addDependencyJars, 100 Class<? extends InputFormat> inputFormat) { 101 102 job.setInputFormat(inputFormat); 103 job.setMapOutputValueClass(outputValueClass); 104 job.setMapOutputKeyClass(outputKeyClass); 105 job.setMapperClass(mapper); 106 job.setStrings("io.serializations", job.get("io.serializations"), 107 MutationSerialization.class.getName(), ResultSerialization.class.getName()); 108 FileInputFormat.addInputPaths(job, table); 109 job.set(TableInputFormat.COLUMN_LIST, columns); 110 if (addDependencyJars) { 111 try { 112 addDependencyJars(job); 113 } catch (IOException e) { 114 e.printStackTrace(); 115 } 116 } 117 try { 118 initCredentials(job); 119 } catch (IOException ioe) { 120 // just spit out the stack trace? really? 121 ioe.printStackTrace(); 122 } 123 } 124 125 /** 126 * Sets up the job for reading from one or more multiple table snapshots, with one or more scans 127 * per snapshot. 128 * It bypasses hbase servers and read directly from snapshot files. 129 * 130 * @param snapshotScans map of snapshot name to scans on that snapshot. 131 * @param mapper The mapper class to use. 132 * @param outputKeyClass The class of the output key. 133 * @param outputValueClass The class of the output value. 134 * @param job The current job to adjust. Make sure the passed job is 135 * carrying all necessary HBase configuration. 136 * @param addDependencyJars upload HBase jars and jars for any of the configured 137 * job classes via the distributed cache (tmpjars). 138 */ 139 public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans, 140 Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 141 JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException { 142 MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir); 143 144 job.setInputFormat(MultiTableSnapshotInputFormat.class); 145 if (outputValueClass != null) { 146 job.setMapOutputValueClass(outputValueClass); 147 } 148 if (outputKeyClass != null) { 149 job.setMapOutputKeyClass(outputKeyClass); 150 } 151 job.setMapperClass(mapper); 152 if (addDependencyJars) { 153 addDependencyJars(job); 154 } 155 156 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job); 157 } 158 159 /** 160 * Sets up the job for reading from a table snapshot. It bypasses hbase servers 161 * and read directly from snapshot files. 162 * 163 * @param snapshotName The name of the snapshot (of a table) to read from. 164 * @param columns The columns to scan. 165 * @param mapper The mapper class to use. 166 * @param outputKeyClass The class of the output key. 167 * @param outputValueClass The class of the output value. 168 * @param job The current job to adjust. Make sure the passed job is 169 * carrying all necessary HBase configuration. 170 * @param addDependencyJars upload HBase jars and jars for any of the configured 171 * job classes via the distributed cache (tmpjars). 172 * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should 173 * have write permissions to this directory, and this should not be a subdirectory of rootdir. 174 * After the job is finished, restore directory can be deleted. 175 * @throws IOException When setting up the details fails. 176 * @see TableSnapshotInputFormat 177 */ 178 public static void initTableSnapshotMapJob(String snapshotName, String columns, 179 Class<? extends TableMap> mapper, 180 Class<?> outputKeyClass, 181 Class<?> outputValueClass, JobConf job, 182 boolean addDependencyJars, Path tmpRestoreDir) 183 throws IOException { 184 TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir); 185 initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job, 186 addDependencyJars, TableSnapshotInputFormat.class); 187 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job); 188 } 189 190 /** 191 * Sets up the job for reading from a table snapshot. It bypasses hbase servers 192 * and read directly from snapshot files. 193 * 194 * @param snapshotName The name of the snapshot (of a table) to read from. 195 * @param columns The columns to scan. 196 * @param mapper The mapper class to use. 197 * @param outputKeyClass The class of the output key. 198 * @param outputValueClass The class of the output value. 199 * @param jobConf The current job to adjust. Make sure the passed job is 200 * carrying all necessary HBase configuration. 201 * @param addDependencyJars upload HBase jars and jars for any of the configured 202 * job classes via the distributed cache (tmpjars). 203 * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should 204 * have write permissions to this directory, and this should not be a subdirectory of rootdir. 205 * After the job is finished, restore directory can be deleted. 206 * @param splitAlgo algorithm to split 207 * @param numSplitsPerRegion how many input splits to generate per one region 208 * @throws IOException When setting up the details fails. 209 * @see TableSnapshotInputFormat 210 */ 211 public static void initTableSnapshotMapJob(String snapshotName, String columns, 212 Class<? extends TableMap> mapper, 213 Class<?> outputKeyClass, 214 Class<?> outputValueClass, JobConf jobConf, 215 boolean addDependencyJars, Path tmpRestoreDir, 216 RegionSplitter.SplitAlgorithm splitAlgo, 217 int numSplitsPerRegion) 218 throws IOException { 219 TableSnapshotInputFormat.setInput(jobConf, snapshotName, tmpRestoreDir, splitAlgo, 220 numSplitsPerRegion); 221 initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, jobConf, 222 addDependencyJars, TableSnapshotInputFormat.class); 223 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(jobConf); 224 } 225 226 227 /** 228 * Use this before submitting a TableReduce job. It will 229 * appropriately set up the JobConf. 230 * 231 * @param table The output table. 232 * @param reducer The reducer class to use. 233 * @param job The current job configuration to adjust. 234 * @throws IOException When determining the region count fails. 235 */ 236 public static void initTableReduceJob(String table, 237 Class<? extends TableReduce> reducer, JobConf job) 238 throws IOException { 239 initTableReduceJob(table, reducer, job, null); 240 } 241 242 /** 243 * Use this before submitting a TableReduce job. It will 244 * appropriately set up the JobConf. 245 * 246 * @param table The output table. 247 * @param reducer The reducer class to use. 248 * @param job The current job configuration to adjust. 249 * @param partitioner Partitioner to use. Pass <code>null</code> to use 250 * default partitioner. 251 * @throws IOException When determining the region count fails. 252 */ 253 public static void initTableReduceJob(String table, 254 Class<? extends TableReduce> reducer, JobConf job, Class partitioner) 255 throws IOException { 256 initTableReduceJob(table, reducer, job, partitioner, true); 257 } 258 259 /** 260 * Use this before submitting a TableReduce job. It will 261 * appropriately set up the JobConf. 262 * 263 * @param table The output table. 264 * @param reducer The reducer class to use. 265 * @param job The current job configuration to adjust. 266 * @param partitioner Partitioner to use. Pass <code>null</code> to use 267 * default partitioner. 268 * @param addDependencyJars upload HBase jars and jars for any of the configured 269 * job classes via the distributed cache (tmpjars). 270 * @throws IOException When determining the region count fails. 271 */ 272 public static void initTableReduceJob(String table, 273 Class<? extends TableReduce> reducer, JobConf job, Class partitioner, 274 boolean addDependencyJars) throws IOException { 275 job.setOutputFormat(TableOutputFormat.class); 276 job.setReducerClass(reducer); 277 job.set(TableOutputFormat.OUTPUT_TABLE, table); 278 job.setOutputKeyClass(ImmutableBytesWritable.class); 279 job.setOutputValueClass(Put.class); 280 job.setStrings("io.serializations", job.get("io.serializations"), 281 MutationSerialization.class.getName(), ResultSerialization.class.getName()); 282 if (partitioner == HRegionPartitioner.class) { 283 job.setPartitionerClass(HRegionPartitioner.class); 284 int regions = getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table)); 285 if (job.getNumReduceTasks() > regions) { 286 job.setNumReduceTasks(regions); 287 } 288 } else if (partitioner != null) { 289 job.setPartitionerClass(partitioner); 290 } 291 if (addDependencyJars) { 292 addDependencyJars(job); 293 } 294 initCredentials(job); 295 } 296 297 public static void initCredentials(JobConf job) throws IOException { 298 UserProvider userProvider = UserProvider.instantiate(job); 299 if (userProvider.isHadoopSecurityEnabled()) { 300 // propagate delegation related props from launcher job to MR job 301 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { 302 job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION")); 303 } 304 } 305 306 if (userProvider.isHBaseSecurityEnabled()) { 307 Connection conn = ConnectionFactory.createConnection(job); 308 try { 309 // login the server principal (if using secure Hadoop) 310 User user = userProvider.getCurrent(); 311 TokenUtil.addTokenForJob(conn, job, user); 312 } catch (InterruptedException ie) { 313 ie.printStackTrace(); 314 Thread.currentThread().interrupt(); 315 } finally { 316 conn.close(); 317 } 318 } 319 } 320 321 /** 322 * Ensures that the given number of reduce tasks for the given job 323 * configuration does not exceed the number of regions for the given table. 324 * 325 * @param table The table to get the region count for. 326 * @param job The current job configuration to adjust. 327 * @throws IOException When retrieving the table details fails. 328 */ 329 // Used by tests. 330 public static void limitNumReduceTasks(String table, JobConf job) throws IOException { 331 int regions = getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table)); 332 if (job.getNumReduceTasks() > regions) { 333 job.setNumReduceTasks(regions); 334 } 335 } 336 337 /** 338 * Ensures that the given number of map tasks for the given job 339 * configuration does not exceed the number of regions for the given table. 340 * 341 * @param table The table to get the region count for. 342 * @param job The current job configuration to adjust. 343 * @throws IOException When retrieving the table details fails. 344 */ 345 // Used by tests. 346 public static void limitNumMapTasks(String table, JobConf job) throws IOException { 347 int regions = getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table)); 348 if (job.getNumMapTasks() > regions) { 349 job.setNumMapTasks(regions); 350 } 351 } 352 353 /** 354 * Sets the number of reduce tasks for the given job configuration to the 355 * number of regions the given table has. 356 * 357 * @param table The table to get the region count for. 358 * @param job The current job configuration to adjust. 359 * @throws IOException When retrieving the table details fails. 360 */ 361 public static void setNumReduceTasks(String table, JobConf job) throws IOException { 362 job.setNumReduceTasks(getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table))); 363 } 364 365 /** 366 * Sets the number of map tasks for the given job configuration to the 367 * number of regions the given table has. 368 * 369 * @param table The table to get the region count for. 370 * @param job The current job configuration to adjust. 371 * @throws IOException When retrieving the table details fails. 372 */ 373 public static void setNumMapTasks(String table, JobConf job) throws IOException { 374 job.setNumMapTasks(getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table))); 375 } 376 377 /** 378 * Sets the number of rows to return and cache with each scanner iteration. 379 * Higher caching values will enable faster mapreduce jobs at the expense of 380 * requiring more heap to contain the cached rows. 381 * 382 * @param job The current job configuration to adjust. 383 * @param batchSize The number of rows to return in batch with each scanner 384 * iteration. 385 */ 386 public static void setScannerCaching(JobConf job, int batchSize) { 387 job.setInt("hbase.client.scanner.caching", batchSize); 388 } 389 390 /** 391 * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job) 392 */ 393 public static void addDependencyJars(JobConf job) throws IOException { 394 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job); 395 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses( 396 job, 397 job.getMapOutputKeyClass(), 398 job.getMapOutputValueClass(), 399 job.getOutputKeyClass(), 400 job.getOutputValueClass(), 401 job.getPartitionerClass(), 402 job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class), 403 job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class), 404 job.getCombinerClass()); 405 } 406 407 408 private static int getRegionCount(Configuration conf, TableName tableName) throws IOException { 409 try (Connection conn = ConnectionFactory.createConnection(conf); 410 RegionLocator locator = conn.getRegionLocator(tableName)) { 411 return locator.getAllRegionLocations().size(); 412 } 413 } 414}