001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapred; 020 021import org.apache.hadoop.fs.Path; 022import org.apache.hadoop.hbase.HBaseConfiguration; 023import org.apache.hadoop.hbase.MetaTableAccessor; 024import org.apache.hadoop.hbase.TableName; 025import org.apache.yetus.audience.InterfaceAudience; 026import org.apache.hadoop.hbase.client.Connection; 027import org.apache.hadoop.hbase.client.ConnectionFactory; 028import org.apache.hadoop.hbase.client.Put; 029import org.apache.hadoop.hbase.client.Scan; 030import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 031import org.apache.hadoop.hbase.mapreduce.MutationSerialization; 032import org.apache.hadoop.hbase.mapreduce.ResultSerialization; 033import org.apache.hadoop.hbase.security.User; 034import org.apache.hadoop.hbase.security.UserProvider; 035import org.apache.hadoop.hbase.security.token.TokenUtil; 036import org.apache.hadoop.hbase.util.RegionSplitter; 037import org.apache.hadoop.mapred.FileInputFormat; 038import org.apache.hadoop.mapred.InputFormat; 039import org.apache.hadoop.mapred.JobConf; 040import org.apache.hadoop.mapred.OutputFormat; 041import org.apache.hadoop.mapred.TextInputFormat; 042import org.apache.hadoop.mapred.TextOutputFormat; 043 044import java.io.IOException; 045import java.util.Collection; 046import java.util.Map; 047 048/** 049 * Utility for {@link TableMap} and {@link TableReduce} 050 */ 051@InterfaceAudience.Public 052@SuppressWarnings({ "rawtypes", "unchecked" }) 053public class TableMapReduceUtil { 054 055 /** 056 * Use this before submitting a TableMap job. It will 057 * appropriately set up the JobConf. 058 * 059 * @param table The table name to read from. 060 * @param columns The columns to scan. 061 * @param mapper The mapper class to use. 062 * @param outputKeyClass The class of the output key. 063 * @param outputValueClass The class of the output value. 064 * @param job The current job configuration to adjust. 065 */ 066 public static void initTableMapJob(String table, String columns, 067 Class<? extends TableMap> mapper, 068 Class<?> outputKeyClass, 069 Class<?> outputValueClass, JobConf job) { 070 initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job, 071 true, TableInputFormat.class); 072 } 073 074 public static void initTableMapJob(String table, String columns, 075 Class<? extends TableMap> mapper, 076 Class<?> outputKeyClass, 077 Class<?> outputValueClass, JobConf job, boolean addDependencyJars) { 078 initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job, 079 addDependencyJars, TableInputFormat.class); 080 } 081 082 /** 083 * Use this before submitting a TableMap job. It will 084 * appropriately set up the JobConf. 085 * 086 * @param table The table name to read from. 087 * @param columns The columns to scan. 088 * @param mapper The mapper class to use. 089 * @param outputKeyClass The class of the output key. 090 * @param outputValueClass The class of the output value. 091 * @param job The current job configuration to adjust. 092 * @param addDependencyJars upload HBase jars and jars for any of the configured 093 * job classes via the distributed cache (tmpjars). 094 */ 095 public static void initTableMapJob(String table, String columns, 096 Class<? extends TableMap> mapper, 097 Class<?> outputKeyClass, 098 Class<?> outputValueClass, JobConf job, boolean addDependencyJars, 099 Class<? extends InputFormat> inputFormat) { 100 101 job.setInputFormat(inputFormat); 102 job.setMapOutputValueClass(outputValueClass); 103 job.setMapOutputKeyClass(outputKeyClass); 104 job.setMapperClass(mapper); 105 job.setStrings("io.serializations", job.get("io.serializations"), 106 MutationSerialization.class.getName(), ResultSerialization.class.getName()); 107 FileInputFormat.addInputPaths(job, table); 108 job.set(TableInputFormat.COLUMN_LIST, columns); 109 if (addDependencyJars) { 110 try { 111 addDependencyJars(job); 112 } catch (IOException e) { 113 e.printStackTrace(); 114 } 115 } 116 try { 117 initCredentials(job); 118 } catch (IOException ioe) { 119 // just spit out the stack trace? really? 120 ioe.printStackTrace(); 121 } 122 } 123 124 /** 125 * Sets up the job for reading from one or more multiple table snapshots, with one or more scans 126 * per snapshot. 127 * It bypasses hbase servers and read directly from snapshot files. 128 * 129 * @param snapshotScans map of snapshot name to scans on that snapshot. 130 * @param mapper The mapper class to use. 131 * @param outputKeyClass The class of the output key. 132 * @param outputValueClass The class of the output value. 133 * @param job The current job to adjust. Make sure the passed job is 134 * carrying all necessary HBase configuration. 135 * @param addDependencyJars upload HBase jars and jars for any of the configured 136 * job classes via the distributed cache (tmpjars). 137 */ 138 public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans, 139 Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 140 JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException { 141 MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir); 142 143 job.setInputFormat(MultiTableSnapshotInputFormat.class); 144 if (outputValueClass != null) { 145 job.setMapOutputValueClass(outputValueClass); 146 } 147 if (outputKeyClass != null) { 148 job.setMapOutputKeyClass(outputKeyClass); 149 } 150 job.setMapperClass(mapper); 151 if (addDependencyJars) { 152 addDependencyJars(job); 153 } 154 155 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job); 156 } 157 158 /** 159 * Sets up the job for reading from a table snapshot. It bypasses hbase servers 160 * and read directly from snapshot files. 161 * 162 * @param snapshotName The name of the snapshot (of a table) to read from. 163 * @param columns The columns to scan. 164 * @param mapper The mapper class to use. 165 * @param outputKeyClass The class of the output key. 166 * @param outputValueClass The class of the output value. 167 * @param job The current job to adjust. Make sure the passed job is 168 * carrying all necessary HBase configuration. 169 * @param addDependencyJars upload HBase jars and jars for any of the configured 170 * job classes via the distributed cache (tmpjars). 171 * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should 172 * have write permissions to this directory, and this should not be a subdirectory of rootdir. 173 * After the job is finished, restore directory can be deleted. 174 * @throws IOException When setting up the details fails. 175 * @see TableSnapshotInputFormat 176 */ 177 public static void initTableSnapshotMapJob(String snapshotName, String columns, 178 Class<? extends TableMap> mapper, 179 Class<?> outputKeyClass, 180 Class<?> outputValueClass, JobConf job, 181 boolean addDependencyJars, Path tmpRestoreDir) 182 throws IOException { 183 TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir); 184 initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job, 185 addDependencyJars, TableSnapshotInputFormat.class); 186 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job); 187 } 188 189 /** 190 * Sets up the job for reading from a table snapshot. It bypasses hbase servers 191 * and read directly from snapshot files. 192 * 193 * @param snapshotName The name of the snapshot (of a table) to read from. 194 * @param columns The columns to scan. 195 * @param mapper The mapper class to use. 196 * @param outputKeyClass The class of the output key. 197 * @param outputValueClass The class of the output value. 198 * @param jobConf The current job to adjust. Make sure the passed job is 199 * carrying all necessary HBase configuration. 200 * @param addDependencyJars upload HBase jars and jars for any of the configured 201 * job classes via the distributed cache (tmpjars). 202 * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should 203 * have write permissions to this directory, and this should not be a subdirectory of rootdir. 204 * After the job is finished, restore directory can be deleted. 205 * @param splitAlgo algorithm to split 206 * @param numSplitsPerRegion how many input splits to generate per one region 207 * @throws IOException When setting up the details fails. 208 * @see TableSnapshotInputFormat 209 */ 210 public static void initTableSnapshotMapJob(String snapshotName, String columns, 211 Class<? extends TableMap> mapper, 212 Class<?> outputKeyClass, 213 Class<?> outputValueClass, JobConf jobConf, 214 boolean addDependencyJars, Path tmpRestoreDir, 215 RegionSplitter.SplitAlgorithm splitAlgo, 216 int numSplitsPerRegion) 217 throws IOException { 218 TableSnapshotInputFormat.setInput(jobConf, snapshotName, tmpRestoreDir, splitAlgo, 219 numSplitsPerRegion); 220 initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, jobConf, 221 addDependencyJars, TableSnapshotInputFormat.class); 222 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(jobConf); 223 } 224 225 226 /** 227 * Use this before submitting a TableReduce job. It will 228 * appropriately set up the JobConf. 229 * 230 * @param table The output table. 231 * @param reducer The reducer class to use. 232 * @param job The current job configuration to adjust. 233 * @throws IOException When determining the region count fails. 234 */ 235 public static void initTableReduceJob(String table, 236 Class<? extends TableReduce> reducer, JobConf job) 237 throws IOException { 238 initTableReduceJob(table, reducer, job, null); 239 } 240 241 /** 242 * Use this before submitting a TableReduce job. It will 243 * appropriately set up the JobConf. 244 * 245 * @param table The output table. 246 * @param reducer The reducer class to use. 247 * @param job The current job configuration to adjust. 248 * @param partitioner Partitioner to use. Pass <code>null</code> to use 249 * default partitioner. 250 * @throws IOException When determining the region count fails. 251 */ 252 public static void initTableReduceJob(String table, 253 Class<? extends TableReduce> reducer, JobConf job, Class partitioner) 254 throws IOException { 255 initTableReduceJob(table, reducer, job, partitioner, true); 256 } 257 258 /** 259 * Use this before submitting a TableReduce job. It will 260 * appropriately set up the JobConf. 261 * 262 * @param table The output table. 263 * @param reducer The reducer class to use. 264 * @param job The current job configuration to adjust. 265 * @param partitioner Partitioner to use. Pass <code>null</code> to use 266 * default partitioner. 267 * @param addDependencyJars upload HBase jars and jars for any of the configured 268 * job classes via the distributed cache (tmpjars). 269 * @throws IOException When determining the region count fails. 270 */ 271 public static void initTableReduceJob(String table, 272 Class<? extends TableReduce> reducer, JobConf job, Class partitioner, 273 boolean addDependencyJars) throws IOException { 274 job.setOutputFormat(TableOutputFormat.class); 275 job.setReducerClass(reducer); 276 job.set(TableOutputFormat.OUTPUT_TABLE, table); 277 job.setOutputKeyClass(ImmutableBytesWritable.class); 278 job.setOutputValueClass(Put.class); 279 job.setStrings("io.serializations", job.get("io.serializations"), 280 MutationSerialization.class.getName(), ResultSerialization.class.getName()); 281 if (partitioner == HRegionPartitioner.class) { 282 job.setPartitionerClass(HRegionPartitioner.class); 283 int regions = 284 MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table)); 285 if (job.getNumReduceTasks() > regions) { 286 job.setNumReduceTasks(regions); 287 } 288 } else if (partitioner != null) { 289 job.setPartitionerClass(partitioner); 290 } 291 if (addDependencyJars) { 292 addDependencyJars(job); 293 } 294 initCredentials(job); 295 } 296 297 public static void initCredentials(JobConf job) throws IOException { 298 UserProvider userProvider = UserProvider.instantiate(job); 299 if (userProvider.isHadoopSecurityEnabled()) { 300 // propagate delegation related props from launcher job to MR job 301 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { 302 job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION")); 303 } 304 } 305 306 if (userProvider.isHBaseSecurityEnabled()) { 307 Connection conn = ConnectionFactory.createConnection(job); 308 try { 309 // login the server principal (if using secure Hadoop) 310 User user = userProvider.getCurrent(); 311 TokenUtil.addTokenForJob(conn, job, user); 312 } catch (InterruptedException ie) { 313 ie.printStackTrace(); 314 Thread.currentThread().interrupt(); 315 } finally { 316 conn.close(); 317 } 318 } 319 } 320 321 /** 322 * Ensures that the given number of reduce tasks for the given job 323 * configuration does not exceed the number of regions for the given table. 324 * 325 * @param table The table to get the region count for. 326 * @param job The current job configuration to adjust. 327 * @throws IOException When retrieving the table details fails. 328 */ 329 // Used by tests. 330 public static void limitNumReduceTasks(String table, JobConf job) 331 throws IOException { 332 int regions = 333 MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table)); 334 if (job.getNumReduceTasks() > regions) 335 job.setNumReduceTasks(regions); 336 } 337 338 /** 339 * Ensures that the given number of map tasks for the given job 340 * configuration does not exceed the number of regions for the given table. 341 * 342 * @param table The table to get the region count for. 343 * @param job The current job configuration to adjust. 344 * @throws IOException When retrieving the table details fails. 345 */ 346 // Used by tests. 347 public static void limitNumMapTasks(String table, JobConf job) 348 throws IOException { 349 int regions = 350 MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table)); 351 if (job.getNumMapTasks() > regions) 352 job.setNumMapTasks(regions); 353 } 354 355 /** 356 * Sets the number of reduce tasks for the given job configuration to the 357 * number of regions the given table has. 358 * 359 * @param table The table to get the region count for. 360 * @param job The current job configuration to adjust. 361 * @throws IOException When retrieving the table details fails. 362 */ 363 public static void setNumReduceTasks(String table, JobConf job) 364 throws IOException { 365 job.setNumReduceTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), 366 TableName.valueOf(table))); 367 } 368 369 /** 370 * Sets the number of map tasks for the given job configuration to the 371 * number of regions the given table has. 372 * 373 * @param table The table to get the region count for. 374 * @param job The current job configuration to adjust. 375 * @throws IOException When retrieving the table details fails. 376 */ 377 public static void setNumMapTasks(String table, JobConf job) 378 throws IOException { 379 job.setNumMapTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), 380 TableName.valueOf(table))); 381 } 382 383 /** 384 * Sets the number of rows to return and cache with each scanner iteration. 385 * Higher caching values will enable faster mapreduce jobs at the expense of 386 * requiring more heap to contain the cached rows. 387 * 388 * @param job The current job configuration to adjust. 389 * @param batchSize The number of rows to return in batch with each scanner 390 * iteration. 391 */ 392 public static void setScannerCaching(JobConf job, int batchSize) { 393 job.setInt("hbase.client.scanner.caching", batchSize); 394 } 395 396 /** 397 * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job) 398 */ 399 public static void addDependencyJars(JobConf job) throws IOException { 400 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job); 401 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses( 402 job, 403 job.getMapOutputKeyClass(), 404 job.getMapOutputValueClass(), 405 job.getOutputKeyClass(), 406 job.getOutputValueClass(), 407 job.getPartitionerClass(), 408 job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class), 409 job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class), 410 job.getCombinerClass()); 411 } 412}