001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.File; 022import java.io.IOException; 023import java.net.URL; 024import java.net.URLDecoder; 025import java.util.ArrayList; 026import java.util.Base64; 027import java.util.Collection; 028import java.util.Enumeration; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.zip.ZipEntry; 035import java.util.zip.ZipFile; 036 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.HBaseConfiguration; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.MetaTableAccessor; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047import org.apache.hadoop.hbase.client.Connection; 048import org.apache.hadoop.hbase.client.ConnectionFactory; 049import org.apache.hadoop.hbase.client.Put; 050import org.apache.hadoop.hbase.client.Scan; 051import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 054import org.apache.hadoop.hbase.security.User; 055import org.apache.hadoop.hbase.security.UserProvider; 056import org.apache.hadoop.hbase.security.token.TokenUtil; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.RegionSplitter; 059import org.apache.hadoop.hbase.zookeeper.ZKConfig; 060import org.apache.hadoop.io.Writable; 061import org.apache.hadoop.mapreduce.InputFormat; 062import org.apache.hadoop.mapreduce.Job; 063import org.apache.hadoop.util.StringUtils; 064 065import com.codahale.metrics.MetricRegistry; 066 067/** 068 * Utility for {@link TableMapper} and {@link TableReducer} 069 */ 070@SuppressWarnings({ "rawtypes", "unchecked" }) 071@InterfaceAudience.Public 072public class TableMapReduceUtil { 073 private static final Logger LOG = LoggerFactory.getLogger(TableMapReduceUtil.class); 074 075 /** 076 * Use this before submitting a TableMap job. It will appropriately set up 077 * the job. 078 * 079 * @param table The table name to read from. 080 * @param scan The scan instance with the columns, time range etc. 081 * @param mapper The mapper class to use. 082 * @param outputKeyClass The class of the output key. 083 * @param outputValueClass The class of the output value. 084 * @param job The current job to adjust. Make sure the passed job is 085 * carrying all necessary HBase configuration. 086 * @throws IOException When setting up the details fails. 087 */ 088 public static void initTableMapperJob(String table, Scan scan, 089 Class<? extends TableMapper> mapper, 090 Class<?> outputKeyClass, 091 Class<?> outputValueClass, Job job) 092 throws IOException { 093 initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, 094 job, true); 095 } 096 097 098 /** 099 * Use this before submitting a TableMap job. It will appropriately set up 100 * the job. 101 * 102 * @param table The table name to read from. 103 * @param scan The scan instance with the columns, time range etc. 104 * @param mapper The mapper class to use. 105 * @param outputKeyClass The class of the output key. 106 * @param outputValueClass The class of the output value. 107 * @param job The current job to adjust. Make sure the passed job is 108 * carrying all necessary HBase configuration. 109 * @throws IOException When setting up the details fails. 110 */ 111 public static void initTableMapperJob(TableName table, 112 Scan scan, 113 Class<? extends TableMapper> mapper, 114 Class<?> outputKeyClass, 115 Class<?> outputValueClass, 116 Job job) throws IOException { 117 initTableMapperJob(table.getNameAsString(), 118 scan, 119 mapper, 120 outputKeyClass, 121 outputValueClass, 122 job, 123 true); 124 } 125 126 /** 127 * Use this before submitting a TableMap job. It will appropriately set up 128 * the job. 129 * 130 * @param table Binary representation of the table name to read from. 131 * @param scan The scan instance with the columns, time range etc. 132 * @param mapper The mapper class to use. 133 * @param outputKeyClass The class of the output key. 134 * @param outputValueClass The class of the output value. 135 * @param job The current job to adjust. Make sure the passed job is 136 * carrying all necessary HBase configuration. 137 * @throws IOException When setting up the details fails. 138 */ 139 public static void initTableMapperJob(byte[] table, Scan scan, 140 Class<? extends TableMapper> mapper, 141 Class<?> outputKeyClass, 142 Class<?> outputValueClass, Job job) 143 throws IOException { 144 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, 145 job, true); 146 } 147 148 /** 149 * Use this before submitting a TableMap job. It will appropriately set up 150 * the job. 151 * 152 * @param table The table name to read from. 153 * @param scan The scan instance with the columns, time range etc. 154 * @param mapper The mapper class to use. 155 * @param outputKeyClass The class of the output key. 156 * @param outputValueClass The class of the output value. 157 * @param job The current job to adjust. Make sure the passed job is 158 * carrying all necessary HBase configuration. 159 * @param addDependencyJars upload HBase jars and jars for any of the configured 160 * job classes via the distributed cache (tmpjars). 161 * @throws IOException When setting up the details fails. 162 */ 163 public static void initTableMapperJob(String table, Scan scan, 164 Class<? extends TableMapper> mapper, 165 Class<?> outputKeyClass, 166 Class<?> outputValueClass, Job job, 167 boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass) 168 throws IOException { 169 initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job, 170 addDependencyJars, true, inputFormatClass); 171 } 172 173 174 /** 175 * Use this before submitting a TableMap job. It will appropriately set up 176 * the job. 177 * 178 * @param table The table name to read from. 179 * @param scan The scan instance with the columns, time range etc. 180 * @param mapper The mapper class to use. 181 * @param outputKeyClass The class of the output key. 182 * @param outputValueClass The class of the output value. 183 * @param job The current job to adjust. Make sure the passed job is 184 * carrying all necessary HBase configuration. 185 * @param addDependencyJars upload HBase jars and jars for any of the configured 186 * job classes via the distributed cache (tmpjars). 187 * @param initCredentials whether to initialize hbase auth credentials for the job 188 * @param inputFormatClass the input format 189 * @throws IOException When setting up the details fails. 190 */ 191 public static void initTableMapperJob(String table, Scan scan, 192 Class<? extends TableMapper> mapper, 193 Class<?> outputKeyClass, 194 Class<?> outputValueClass, Job job, 195 boolean addDependencyJars, boolean initCredentials, 196 Class<? extends InputFormat> inputFormatClass) 197 throws IOException { 198 job.setInputFormatClass(inputFormatClass); 199 if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass); 200 if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass); 201 job.setMapperClass(mapper); 202 if (Put.class.equals(outputValueClass)) { 203 job.setCombinerClass(PutCombiner.class); 204 } 205 Configuration conf = job.getConfiguration(); 206 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); 207 conf.set(TableInputFormat.INPUT_TABLE, table); 208 conf.set(TableInputFormat.SCAN, convertScanToString(scan)); 209 conf.setStrings("io.serializations", conf.get("io.serializations"), 210 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 211 CellSerialization.class.getName()); 212 if (addDependencyJars) { 213 addDependencyJars(job); 214 } 215 if (initCredentials) { 216 initCredentials(job); 217 } 218 } 219 220 /** 221 * Use this before submitting a TableMap job. It will appropriately set up 222 * the job. 223 * 224 * @param table Binary representation of the table name to read from. 225 * @param scan The scan instance with the columns, time range etc. 226 * @param mapper The mapper class to use. 227 * @param outputKeyClass The class of the output key. 228 * @param outputValueClass The class of the output value. 229 * @param job The current job to adjust. Make sure the passed job is 230 * carrying all necessary HBase configuration. 231 * @param addDependencyJars upload HBase jars and jars for any of the configured 232 * job classes via the distributed cache (tmpjars). 233 * @param inputFormatClass The class of the input format 234 * @throws IOException When setting up the details fails. 235 */ 236 public static void initTableMapperJob(byte[] table, Scan scan, 237 Class<? extends TableMapper> mapper, 238 Class<?> outputKeyClass, 239 Class<?> outputValueClass, Job job, 240 boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass) 241 throws IOException { 242 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, 243 outputValueClass, job, addDependencyJars, inputFormatClass); 244 } 245 246 /** 247 * Use this before submitting a TableMap job. It will appropriately set up 248 * the job. 249 * 250 * @param table Binary representation of the table name to read from. 251 * @param scan The scan instance with the columns, time range etc. 252 * @param mapper The mapper class to use. 253 * @param outputKeyClass The class of the output key. 254 * @param outputValueClass The class of the output value. 255 * @param job The current job to adjust. Make sure the passed job is 256 * carrying all necessary HBase configuration. 257 * @param addDependencyJars upload HBase jars and jars for any of the configured 258 * job classes via the distributed cache (tmpjars). 259 * @throws IOException When setting up the details fails. 260 */ 261 public static void initTableMapperJob(byte[] table, Scan scan, 262 Class<? extends TableMapper> mapper, 263 Class<?> outputKeyClass, 264 Class<?> outputValueClass, Job job, 265 boolean addDependencyJars) 266 throws IOException { 267 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, 268 outputValueClass, job, addDependencyJars, TableInputFormat.class); 269 } 270 271 /** 272 * Use this before submitting a TableMap job. It will appropriately set up 273 * the job. 274 * 275 * @param table The table name to read from. 276 * @param scan The scan instance with the columns, time range etc. 277 * @param mapper The mapper class to use. 278 * @param outputKeyClass The class of the output key. 279 * @param outputValueClass The class of the output value. 280 * @param job The current job to adjust. Make sure the passed job is 281 * carrying all necessary HBase configuration. 282 * @param addDependencyJars upload HBase jars and jars for any of the configured 283 * job classes via the distributed cache (tmpjars). 284 * @throws IOException When setting up the details fails. 285 */ 286 public static void initTableMapperJob(String table, Scan scan, 287 Class<? extends TableMapper> mapper, 288 Class<?> outputKeyClass, 289 Class<?> outputValueClass, Job job, 290 boolean addDependencyJars) 291 throws IOException { 292 initTableMapperJob(table, scan, mapper, outputKeyClass, 293 outputValueClass, job, addDependencyJars, TableInputFormat.class); 294 } 295 296 /** 297 * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on 298 * direct memory will likely cause the map tasks to OOM when opening the region. This 299 * is done here instead of in TableSnapshotRegionRecordReader in case an advanced user 300 * wants to override this behavior in their job. 301 */ 302 public static void resetCacheConfig(Configuration conf) { 303 conf.setFloat( 304 HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); 305 conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f); 306 conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY); 307 } 308 309 /** 310 * Sets up the job for reading from one or more table snapshots, with one or more scans 311 * per snapshot. 312 * It bypasses hbase servers and read directly from snapshot files. 313 * 314 * @param snapshotScans map of snapshot name to scans on that snapshot. 315 * @param mapper The mapper class to use. 316 * @param outputKeyClass The class of the output key. 317 * @param outputValueClass The class of the output value. 318 * @param job The current job to adjust. Make sure the passed job is 319 * carrying all necessary HBase configuration. 320 * @param addDependencyJars upload HBase jars and jars for any of the configured 321 * job classes via the distributed cache (tmpjars). 322 */ 323 public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans, 324 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 325 Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException { 326 MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir); 327 328 job.setInputFormatClass(MultiTableSnapshotInputFormat.class); 329 if (outputValueClass != null) { 330 job.setMapOutputValueClass(outputValueClass); 331 } 332 if (outputKeyClass != null) { 333 job.setMapOutputKeyClass(outputKeyClass); 334 } 335 job.setMapperClass(mapper); 336 Configuration conf = job.getConfiguration(); 337 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); 338 339 if (addDependencyJars) { 340 addDependencyJars(job); 341 addDependencyJarsForClasses(job.getConfiguration(), MetricRegistry.class); 342 } 343 344 resetCacheConfig(job.getConfiguration()); 345 } 346 347 /** 348 * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly 349 * from snapshot files. 350 * @param snapshotName The name of the snapshot (of a table) to read from. 351 * @param scan The scan instance with the columns, time range etc. 352 * @param mapper The mapper class to use. 353 * @param outputKeyClass The class of the output key. 354 * @param outputValueClass The class of the output value. 355 * @param job The current job to adjust. Make sure the passed job is carrying all necessary HBase 356 * configuration. 357 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 358 * the distributed cache (tmpjars). 359 * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should 360 * have write permissions to this directory, and this should not be a subdirectory of 361 * rootdir. After the job is finished, restore directory can be deleted. 362 * @throws IOException When setting up the details fails. 363 * @see TableSnapshotInputFormat 364 */ 365 public static void initTableSnapshotMapperJob(String snapshotName, Scan scan, 366 Class<? extends TableMapper> mapper, 367 Class<?> outputKeyClass, 368 Class<?> outputValueClass, Job job, 369 boolean addDependencyJars, Path tmpRestoreDir) 370 throws IOException { 371 TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir); 372 initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, outputValueClass, job, 373 addDependencyJars, false, TableSnapshotInputFormat.class); 374 resetCacheConfig(job.getConfiguration()); 375 } 376 377 /** 378 * Sets up the job for reading from a table snapshot. It bypasses hbase servers 379 * and read directly from snapshot files. 380 * 381 * @param snapshotName The name of the snapshot (of a table) to read from. 382 * @param scan The scan instance with the columns, time range etc. 383 * @param mapper The mapper class to use. 384 * @param outputKeyClass The class of the output key. 385 * @param outputValueClass The class of the output value. 386 * @param job The current job to adjust. Make sure the passed job is 387 * carrying all necessary HBase configuration. 388 * @param addDependencyJars upload HBase jars and jars for any of the configured 389 * job classes via the distributed cache (tmpjars). 390 * 391 * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should 392 * have write permissions to this directory, and this should not be a subdirectory of rootdir. 393 * After the job is finished, restore directory can be deleted. 394 * @param splitAlgo algorithm to split 395 * @param numSplitsPerRegion how many input splits to generate per one region 396 * @throws IOException When setting up the details fails. 397 * @see TableSnapshotInputFormat 398 */ 399 public static void initTableSnapshotMapperJob(String snapshotName, Scan scan, 400 Class<? extends TableMapper> mapper, 401 Class<?> outputKeyClass, 402 Class<?> outputValueClass, Job job, 403 boolean addDependencyJars, Path tmpRestoreDir, 404 RegionSplitter.SplitAlgorithm splitAlgo, 405 int numSplitsPerRegion) 406 throws IOException { 407 TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir, splitAlgo, 408 numSplitsPerRegion); 409 initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, 410 outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class); 411 resetCacheConfig(job.getConfiguration()); 412 } 413 414 /** 415 * Use this before submitting a Multi TableMap job. It will appropriately set 416 * up the job. 417 * 418 * @param scans The list of {@link Scan} objects to read from. 419 * @param mapper The mapper class to use. 420 * @param outputKeyClass The class of the output key. 421 * @param outputValueClass The class of the output value. 422 * @param job The current job to adjust. Make sure the passed job is carrying 423 * all necessary HBase configuration. 424 * @throws IOException When setting up the details fails. 425 */ 426 public static void initTableMapperJob(List<Scan> scans, 427 Class<? extends TableMapper> mapper, 428 Class<?> outputKeyClass, 429 Class<?> outputValueClass, Job job) throws IOException { 430 initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job, 431 true); 432 } 433 434 /** 435 * Use this before submitting a Multi TableMap job. It will appropriately set 436 * up the job. 437 * 438 * @param scans The list of {@link Scan} objects to read from. 439 * @param mapper The mapper class to use. 440 * @param outputKeyClass The class of the output key. 441 * @param outputValueClass The class of the output value. 442 * @param job The current job to adjust. Make sure the passed job is carrying 443 * all necessary HBase configuration. 444 * @param addDependencyJars upload HBase jars and jars for any of the 445 * configured job classes via the distributed cache (tmpjars). 446 * @throws IOException When setting up the details fails. 447 */ 448 public static void initTableMapperJob(List<Scan> scans, 449 Class<? extends TableMapper> mapper, 450 Class<?> outputKeyClass, 451 Class<?> outputValueClass, Job job, 452 boolean addDependencyJars) throws IOException { 453 initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job, 454 addDependencyJars, true); 455 } 456 457 /** 458 * Use this before submitting a Multi TableMap job. It will appropriately set 459 * up the job. 460 * 461 * @param scans The list of {@link Scan} objects to read from. 462 * @param mapper The mapper class to use. 463 * @param outputKeyClass The class of the output key. 464 * @param outputValueClass The class of the output value. 465 * @param job The current job to adjust. Make sure the passed job is carrying 466 * all necessary HBase configuration. 467 * @param addDependencyJars upload HBase jars and jars for any of the 468 * configured job classes via the distributed cache (tmpjars). 469 * @param initCredentials whether to initialize hbase auth credentials for the job 470 * @throws IOException When setting up the details fails. 471 */ 472 public static void initTableMapperJob(List<Scan> scans, 473 Class<? extends TableMapper> mapper, 474 Class<?> outputKeyClass, 475 Class<?> outputValueClass, Job job, 476 boolean addDependencyJars, 477 boolean initCredentials) throws IOException { 478 job.setInputFormatClass(MultiTableInputFormat.class); 479 if (outputValueClass != null) { 480 job.setMapOutputValueClass(outputValueClass); 481 } 482 if (outputKeyClass != null) { 483 job.setMapOutputKeyClass(outputKeyClass); 484 } 485 job.setMapperClass(mapper); 486 Configuration conf = job.getConfiguration(); 487 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); 488 List<String> scanStrings = new ArrayList<>(); 489 490 for (Scan scan : scans) { 491 scanStrings.add(convertScanToString(scan)); 492 } 493 job.getConfiguration().setStrings(MultiTableInputFormat.SCANS, 494 scanStrings.toArray(new String[scanStrings.size()])); 495 496 if (addDependencyJars) { 497 addDependencyJars(job); 498 } 499 500 if (initCredentials) { 501 initCredentials(job); 502 } 503 } 504 505 public static void initCredentials(Job job) throws IOException { 506 UserProvider userProvider = UserProvider.instantiate(job.getConfiguration()); 507 if (userProvider.isHadoopSecurityEnabled()) { 508 // propagate delegation related props from launcher job to MR job 509 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { 510 job.getConfiguration().set("mapreduce.job.credentials.binary", 511 System.getenv("HADOOP_TOKEN_FILE_LOCATION")); 512 } 513 } 514 515 if (userProvider.isHBaseSecurityEnabled()) { 516 try { 517 // init credentials for remote cluster 518 String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS); 519 User user = userProvider.getCurrent(); 520 if (quorumAddress != null) { 521 Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(), 522 quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX); 523 Connection peerConn = ConnectionFactory.createConnection(peerConf); 524 try { 525 TokenUtil.addTokenForJob(peerConn, user, job); 526 } finally { 527 peerConn.close(); 528 } 529 } 530 531 Connection conn = ConnectionFactory.createConnection(job.getConfiguration()); 532 try { 533 TokenUtil.addTokenForJob(conn, user, job); 534 } finally { 535 conn.close(); 536 } 537 } catch (InterruptedException ie) { 538 LOG.info("Interrupted obtaining user authentication token"); 539 Thread.currentThread().interrupt(); 540 } 541 } 542 } 543 544 /** 545 * Obtain an authentication token, for the specified cluster, on behalf of the current user 546 * and add it to the credentials for the given map reduce job. 547 * 548 * The quorumAddress is the key to the ZK ensemble, which contains: 549 * hbase.zookeeper.quorum, hbase.zookeeper.client.port and 550 * zookeeper.znode.parent 551 * 552 * @param job The job that requires the permission. 553 * @param quorumAddress string that contains the 3 required configuratins 554 * @throws IOException When the authentication token cannot be obtained. 555 * @deprecated Since 1.2.0 and will be removed in 3.0.0. Use 556 * {@link #initCredentialsForCluster(Job, Configuration)} instead. 557 * @see #initCredentialsForCluster(Job, Configuration) 558 * @see <a href="https://issues.apache.org/jira/browse/HBASE-14886">HBASE-14886</a> 559 */ 560 @Deprecated 561 public static void initCredentialsForCluster(Job job, String quorumAddress) 562 throws IOException { 563 Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(), 564 quorumAddress); 565 initCredentialsForCluster(job, peerConf); 566 } 567 568 /** 569 * Obtain an authentication token, for the specified cluster, on behalf of the current user 570 * and add it to the credentials for the given map reduce job. 571 * 572 * @param job The job that requires the permission. 573 * @param conf The configuration to use in connecting to the peer cluster 574 * @throws IOException When the authentication token cannot be obtained. 575 */ 576 public static void initCredentialsForCluster(Job job, Configuration conf) 577 throws IOException { 578 UserProvider userProvider = UserProvider.instantiate(job.getConfiguration()); 579 if (userProvider.isHBaseSecurityEnabled()) { 580 try { 581 Connection peerConn = ConnectionFactory.createConnection(conf); 582 try { 583 TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job); 584 } finally { 585 peerConn.close(); 586 } 587 } catch (InterruptedException e) { 588 LOG.info("Interrupted obtaining user authentication token"); 589 Thread.interrupted(); 590 } 591 } 592 } 593 594 /** 595 * Writes the given scan into a Base64 encoded string. 596 * 597 * @param scan The scan to write out. 598 * @return The scan saved in a Base64 encoded string. 599 * @throws IOException When writing the scan fails. 600 */ 601 public static String convertScanToString(Scan scan) throws IOException { 602 ClientProtos.Scan proto = ProtobufUtil.toScan(scan); 603 return Bytes.toString(Base64.getEncoder().encode(proto.toByteArray())); 604 } 605 606 /** 607 * Converts the given Base64 string back into a Scan instance. 608 * 609 * @param base64 The scan details. 610 * @return The newly created Scan instance. 611 * @throws IOException When reading the scan instance fails. 612 */ 613 public static Scan convertStringToScan(String base64) throws IOException { 614 byte [] decoded = Base64.getDecoder().decode(base64); 615 return ProtobufUtil.toScan(ClientProtos.Scan.parseFrom(decoded)); 616 } 617 618 /** 619 * Use this before submitting a TableReduce job. It will 620 * appropriately set up the JobConf. 621 * 622 * @param table The output table. 623 * @param reducer The reducer class to use. 624 * @param job The current job to adjust. 625 * @throws IOException When determining the region count fails. 626 */ 627 public static void initTableReducerJob(String table, 628 Class<? extends TableReducer> reducer, Job job) 629 throws IOException { 630 initTableReducerJob(table, reducer, job, null); 631 } 632 633 /** 634 * Use this before submitting a TableReduce job. It will 635 * appropriately set up the JobConf. 636 * 637 * @param table The output table. 638 * @param reducer The reducer class to use. 639 * @param job The current job to adjust. 640 * @param partitioner Partitioner to use. Pass <code>null</code> to use 641 * default partitioner. 642 * @throws IOException When determining the region count fails. 643 */ 644 public static void initTableReducerJob(String table, 645 Class<? extends TableReducer> reducer, Job job, 646 Class partitioner) throws IOException { 647 initTableReducerJob(table, reducer, job, partitioner, null, null, null); 648 } 649 650 /** 651 * Use this before submitting a TableReduce job. It will 652 * appropriately set up the JobConf. 653 * 654 * @param table The output table. 655 * @param reducer The reducer class to use. 656 * @param job The current job to adjust. Make sure the passed job is 657 * carrying all necessary HBase configuration. 658 * @param partitioner Partitioner to use. Pass <code>null</code> to use 659 * default partitioner. 660 * @param quorumAddress Distant cluster to write to; default is null for 661 * output to the cluster that is designated in <code>hbase-site.xml</code>. 662 * Set this String to the zookeeper ensemble of an alternate remote cluster 663 * when you would have the reduce write a cluster that is other than the 664 * default; e.g. copying tables between clusters, the source would be 665 * designated by <code>hbase-site.xml</code> and this param would have the 666 * ensemble address of the remote cluster. The format to pass is particular. 667 * Pass <code> <hbase.zookeeper.quorum>:< 668 * hbase.zookeeper.client.port>:<zookeeper.znode.parent> 669 * </code> such as <code>server,server2,server3:2181:/hbase</code>. 670 * @param serverClass redefined hbase.regionserver.class 671 * @param serverImpl redefined hbase.regionserver.impl 672 * @throws IOException When determining the region count fails. 673 */ 674 public static void initTableReducerJob(String table, 675 Class<? extends TableReducer> reducer, Job job, 676 Class partitioner, String quorumAddress, String serverClass, 677 String serverImpl) throws IOException { 678 initTableReducerJob(table, reducer, job, partitioner, quorumAddress, 679 serverClass, serverImpl, true); 680 } 681 682 /** 683 * Use this before submitting a TableReduce job. It will 684 * appropriately set up the JobConf. 685 * 686 * @param table The output table. 687 * @param reducer The reducer class to use. 688 * @param job The current job to adjust. Make sure the passed job is 689 * carrying all necessary HBase configuration. 690 * @param partitioner Partitioner to use. Pass <code>null</code> to use 691 * default partitioner. 692 * @param quorumAddress Distant cluster to write to; default is null for 693 * output to the cluster that is designated in <code>hbase-site.xml</code>. 694 * Set this String to the zookeeper ensemble of an alternate remote cluster 695 * when you would have the reduce write a cluster that is other than the 696 * default; e.g. copying tables between clusters, the source would be 697 * designated by <code>hbase-site.xml</code> and this param would have the 698 * ensemble address of the remote cluster. The format to pass is particular. 699 * Pass <code> <hbase.zookeeper.quorum>:< 700 * hbase.zookeeper.client.port>:<zookeeper.znode.parent> 701 * </code> such as <code>server,server2,server3:2181:/hbase</code>. 702 * @param serverClass redefined hbase.regionserver.class 703 * @param serverImpl redefined hbase.regionserver.impl 704 * @param addDependencyJars upload HBase jars and jars for any of the configured 705 * job classes via the distributed cache (tmpjars). 706 * @throws IOException When determining the region count fails. 707 */ 708 public static void initTableReducerJob(String table, 709 Class<? extends TableReducer> reducer, Job job, 710 Class partitioner, String quorumAddress, String serverClass, 711 String serverImpl, boolean addDependencyJars) throws IOException { 712 713 Configuration conf = job.getConfiguration(); 714 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); 715 job.setOutputFormatClass(TableOutputFormat.class); 716 if (reducer != null) job.setReducerClass(reducer); 717 conf.set(TableOutputFormat.OUTPUT_TABLE, table); 718 conf.setStrings("io.serializations", conf.get("io.serializations"), 719 MutationSerialization.class.getName(), ResultSerialization.class.getName()); 720 // If passed a quorum/ensemble address, pass it on to TableOutputFormat. 721 if (quorumAddress != null) { 722 // Calling this will validate the format 723 ZKConfig.validateClusterKey(quorumAddress); 724 conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress); 725 } 726 if (serverClass != null && serverImpl != null) { 727 conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass); 728 conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl); 729 } 730 job.setOutputKeyClass(ImmutableBytesWritable.class); 731 job.setOutputValueClass(Writable.class); 732 if (partitioner == HRegionPartitioner.class) { 733 job.setPartitionerClass(HRegionPartitioner.class); 734 int regions = MetaTableAccessor.getRegionCount(conf, TableName.valueOf(table)); 735 if (job.getNumReduceTasks() > regions) { 736 job.setNumReduceTasks(regions); 737 } 738 } else if (partitioner != null) { 739 job.setPartitionerClass(partitioner); 740 } 741 742 if (addDependencyJars) { 743 addDependencyJars(job); 744 } 745 746 initCredentials(job); 747 } 748 749 /** 750 * Ensures that the given number of reduce tasks for the given job 751 * configuration does not exceed the number of regions for the given table. 752 * 753 * @param table The table to get the region count for. 754 * @param job The current job to adjust. 755 * @throws IOException When retrieving the table details fails. 756 */ 757 public static void limitNumReduceTasks(String table, Job job) 758 throws IOException { 759 int regions = 760 MetaTableAccessor.getRegionCount(job.getConfiguration(), TableName.valueOf(table)); 761 if (job.getNumReduceTasks() > regions) 762 job.setNumReduceTasks(regions); 763 } 764 765 /** 766 * Sets the number of reduce tasks for the given job configuration to the 767 * number of regions the given table has. 768 * 769 * @param table The table to get the region count for. 770 * @param job The current job to adjust. 771 * @throws IOException When retrieving the table details fails. 772 */ 773 public static void setNumReduceTasks(String table, Job job) 774 throws IOException { 775 job.setNumReduceTasks(MetaTableAccessor.getRegionCount(job.getConfiguration(), 776 TableName.valueOf(table))); 777 } 778 779 /** 780 * Sets the number of rows to return and cache with each scanner iteration. 781 * Higher caching values will enable faster mapreduce jobs at the expense of 782 * requiring more heap to contain the cached rows. 783 * 784 * @param job The current job to adjust. 785 * @param batchSize The number of rows to return in batch with each scanner 786 * iteration. 787 */ 788 public static void setScannerCaching(Job job, int batchSize) { 789 job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize); 790 } 791 792 /** 793 * Add HBase and its dependencies (only) to the job configuration. 794 * <p> 795 * This is intended as a low-level API, facilitating code reuse between this 796 * class and its mapred counterpart. It also of use to external tools that 797 * need to build a MapReduce job that interacts with HBase but want 798 * fine-grained control over the jars shipped to the cluster. 799 * </p> 800 * @param conf The Configuration object to extend with dependencies. 801 * @see org.apache.hadoop.hbase.mapred.TableMapReduceUtil 802 * @see <a href="https://issues.apache.org/jira/browse/PIG-3285">PIG-3285</a> 803 */ 804 public static void addHBaseDependencyJars(Configuration conf) throws IOException { 805 addDependencyJarsForClasses(conf, 806 // explicitly pull a class from each module 807 org.apache.hadoop.hbase.HConstants.class, // hbase-common 808 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class, // hbase-protocol 809 org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.class, // hbase-protocol-shaded 810 org.apache.hadoop.hbase.client.Put.class, // hbase-client 811 org.apache.hadoop.hbase.ipc.RpcServer.class, // hbase-server 812 org.apache.hadoop.hbase.CompatibilityFactory.class, // hbase-hadoop-compat 813 org.apache.hadoop.hbase.mapreduce.JobUtil.class, // hbase-hadoop2-compat 814 org.apache.hadoop.hbase.mapreduce.TableMapper.class, // hbase-mapreduce 815 org.apache.hadoop.hbase.metrics.impl.FastLongHistogram.class, // hbase-metrics 816 org.apache.hadoop.hbase.metrics.Snapshot.class, // hbase-metrics-api 817 org.apache.hadoop.hbase.replication.ReplicationUtils.class, // hbase-replication 818 org.apache.hadoop.hbase.http.HttpServer.class, // hbase-http 819 org.apache.hadoop.hbase.procedure2.Procedure.class, // hbase-procedure 820 org.apache.hadoop.hbase.zookeeper.ZKWatcher.class, // hbase-zookeeper 821 org.apache.hbase.thirdparty.com.google.common.collect.Lists.class, // hb-shaded-miscellaneous 822 org.apache.hbase.thirdparty.com.google.gson.GsonBuilder.class, // hbase-shaded-gson 823 org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations.class, // hb-sh-protobuf 824 org.apache.hbase.thirdparty.io.netty.channel.Channel.class, // hbase-shaded-netty 825 org.apache.zookeeper.ZooKeeper.class, // zookeeper 826 com.google.protobuf.Message.class, // protobuf 827 org.apache.htrace.core.Tracer.class, // htrace 828 com.codahale.metrics.MetricRegistry.class, // metrics-core 829 org.apache.commons.lang3.ArrayUtils.class); // commons-lang 830 } 831 832 /** 833 * Returns a classpath string built from the content of the "tmpjars" value in {@code conf}. 834 * Also exposed to shell scripts via `bin/hbase mapredcp`. 835 */ 836 public static String buildDependencyClasspath(Configuration conf) { 837 if (conf == null) { 838 throw new IllegalArgumentException("Must provide a configuration object."); 839 } 840 Set<String> paths = new HashSet<>(conf.getStringCollection("tmpjars")); 841 if (paths.isEmpty()) { 842 throw new IllegalArgumentException("Configuration contains no tmpjars."); 843 } 844 StringBuilder sb = new StringBuilder(); 845 for (String s : paths) { 846 // entries can take the form 'file:/path/to/file.jar'. 847 int idx = s.indexOf(":"); 848 if (idx != -1) s = s.substring(idx + 1); 849 if (sb.length() > 0) sb.append(File.pathSeparator); 850 sb.append(s); 851 } 852 return sb.toString(); 853 } 854 855 /** 856 * Add the HBase dependency jars as well as jars for any of the configured 857 * job classes to the job configuration, so that JobClient will ship them 858 * to the cluster and add them to the DistributedCache. 859 */ 860 public static void addDependencyJars(Job job) throws IOException { 861 addHBaseDependencyJars(job.getConfiguration()); 862 try { 863 addDependencyJarsForClasses(job.getConfiguration(), 864 // when making changes here, consider also mapred.TableMapReduceUtil 865 // pull job classes 866 job.getMapOutputKeyClass(), 867 job.getMapOutputValueClass(), 868 job.getInputFormatClass(), 869 job.getOutputKeyClass(), 870 job.getOutputValueClass(), 871 job.getOutputFormatClass(), 872 job.getPartitionerClass(), 873 job.getCombinerClass()); 874 } catch (ClassNotFoundException e) { 875 throw new IOException(e); 876 } 877 } 878 879 /** 880 * Add the jars containing the given classes to the job's configuration 881 * such that JobClient will ship them to the cluster and add them to 882 * the DistributedCache. 883 * @deprecated since 1.3.0 and will be removed in 3.0.0. Use {@link #addDependencyJars(Job)} 884 * instead. 885 * @see #addDependencyJars(Job) 886 * @see <a href="https://issues.apache.org/jira/browse/HBASE-8386">HBASE-8386</a> 887 */ 888 @Deprecated 889 public static void addDependencyJars(Configuration conf, 890 Class<?>... classes) throws IOException { 891 LOG.warn("The addDependencyJars(Configuration, Class<?>...) method has been deprecated since it" 892 + " is easy to use incorrectly. Most users should rely on addDependencyJars(Job) " + 893 "instead. See HBASE-8386 for more details."); 894 addDependencyJarsForClasses(conf, classes); 895 } 896 897 /** 898 * Add the jars containing the given classes to the job's configuration 899 * such that JobClient will ship them to the cluster and add them to 900 * the DistributedCache. 901 * 902 * N.B. that this method at most adds one jar per class given. If there is more than one 903 * jar available containing a class with the same name as a given class, we don't define 904 * which of those jars might be chosen. 905 * 906 * @param conf The Hadoop Configuration to modify 907 * @param classes will add just those dependencies needed to find the given classes 908 * @throws IOException if an underlying library call fails. 909 */ 910 @InterfaceAudience.Private 911 public static void addDependencyJarsForClasses(Configuration conf, 912 Class<?>... classes) throws IOException { 913 914 FileSystem localFs = FileSystem.getLocal(conf); 915 Set<String> jars = new HashSet<>(); 916 // Add jars that are already in the tmpjars variable 917 jars.addAll(conf.getStringCollection("tmpjars")); 918 919 // add jars as we find them to a map of contents jar name so that we can avoid 920 // creating new jars for classes that have already been packaged. 921 Map<String, String> packagedClasses = new HashMap<>(); 922 923 // Add jars containing the specified classes 924 for (Class<?> clazz : classes) { 925 if (clazz == null) continue; 926 927 Path path = findOrCreateJar(clazz, localFs, packagedClasses); 928 if (path == null) { 929 LOG.warn("Could not find jar for class " + clazz + 930 " in order to ship it to the cluster."); 931 continue; 932 } 933 if (!localFs.exists(path)) { 934 LOG.warn("Could not validate jar file " + path + " for class " 935 + clazz); 936 continue; 937 } 938 jars.add(path.toString()); 939 } 940 if (jars.isEmpty()) return; 941 942 conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()]))); 943 } 944 945 /** 946 * Finds the Jar for a class or creates it if it doesn't exist. If the class is in 947 * a directory in the classpath, it creates a Jar on the fly with the 948 * contents of the directory and returns the path to that Jar. If a Jar is 949 * created, it is created in the system temporary directory. Otherwise, 950 * returns an existing jar that contains a class of the same name. Maintains 951 * a mapping from jar contents to the tmp jar created. 952 * @param my_class the class to find. 953 * @param fs the FileSystem with which to qualify the returned path. 954 * @param packagedClasses a map of class name to path. 955 * @return a jar file that contains the class. 956 * @throws IOException 957 */ 958 private static Path findOrCreateJar(Class<?> my_class, FileSystem fs, 959 Map<String, String> packagedClasses) 960 throws IOException { 961 // attempt to locate an existing jar for the class. 962 String jar = findContainingJar(my_class, packagedClasses); 963 if (null == jar || jar.isEmpty()) { 964 jar = getJar(my_class); 965 updateMap(jar, packagedClasses); 966 } 967 968 if (null == jar || jar.isEmpty()) { 969 return null; 970 } 971 972 LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar)); 973 return new Path(jar).makeQualified(fs.getUri(), fs.getWorkingDirectory()); 974 } 975 976 /** 977 * Add entries to <code>packagedClasses</code> corresponding to class files 978 * contained in <code>jar</code>. 979 * @param jar The jar who's content to list. 980 * @param packagedClasses map[class -> jar] 981 */ 982 private static void updateMap(String jar, Map<String, String> packagedClasses) throws IOException { 983 if (null == jar || jar.isEmpty()) { 984 return; 985 } 986 ZipFile zip = null; 987 try { 988 zip = new ZipFile(jar); 989 for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) { 990 ZipEntry entry = iter.nextElement(); 991 if (entry.getName().endsWith("class")) { 992 packagedClasses.put(entry.getName(), jar); 993 } 994 } 995 } finally { 996 if (null != zip) zip.close(); 997 } 998 } 999 1000 /** 1001 * Find a jar that contains a class of the same name, if any. It will return 1002 * a jar file, even if that is not the first thing on the class path that 1003 * has a class with the same name. Looks first on the classpath and then in 1004 * the <code>packagedClasses</code> map. 1005 * @param my_class the class to find. 1006 * @return a jar file that contains the class, or null. 1007 * @throws IOException 1008 */ 1009 private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses) 1010 throws IOException { 1011 ClassLoader loader = my_class.getClassLoader(); 1012 1013 String class_file = my_class.getName().replaceAll("\\.", "/") + ".class"; 1014 1015 if (loader != null) { 1016 // first search the classpath 1017 for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) { 1018 URL url = itr.nextElement(); 1019 if ("jar".equals(url.getProtocol())) { 1020 String toReturn = url.getPath(); 1021 if (toReturn.startsWith("file:")) { 1022 toReturn = toReturn.substring("file:".length()); 1023 } 1024 // URLDecoder is a misnamed class, since it actually decodes 1025 // x-www-form-urlencoded MIME type rather than actual 1026 // URL encoding (which the file path has). Therefore it would 1027 // decode +s to ' 's which is incorrect (spaces are actually 1028 // either unencoded or encoded as "%20"). Replace +s first, so 1029 // that they are kept sacred during the decoding process. 1030 toReturn = toReturn.replaceAll("\\+", "%2B"); 1031 toReturn = URLDecoder.decode(toReturn, "UTF-8"); 1032 return toReturn.replaceAll("!.*$", ""); 1033 } 1034 } 1035 } 1036 1037 // now look in any jars we've packaged using JarFinder. Returns null when 1038 // no jar is found. 1039 return packagedClasses.get(class_file); 1040 } 1041 1042 /** 1043 * Invoke 'getJar' on a custom JarFinder implementation. Useful for some job 1044 * configuration contexts (HBASE-8140) and also for testing on MRv2. 1045 * check if we have HADOOP-9426. 1046 * @param my_class the class to find. 1047 * @return a jar file that contains the class, or null. 1048 */ 1049 private static String getJar(Class<?> my_class) { 1050 String ret = null; 1051 try { 1052 ret = JarFinder.getJar(my_class); 1053 } catch (Exception e) { 1054 // toss all other exceptions, related to reflection failure 1055 throw new RuntimeException("getJar invocation failed.", e); 1056 } 1057 1058 return ret; 1059 } 1060}