001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import com.codahale.metrics.MetricRegistry; 021import java.io.File; 022import java.io.IOException; 023import java.net.URI; 024import java.net.URISyntaxException; 025import java.net.URL; 026import java.net.URLDecoder; 027import java.util.ArrayList; 028import java.util.Base64; 029import java.util.Collection; 030import java.util.Enumeration; 031import java.util.HashMap; 032import java.util.HashSet; 033import java.util.List; 034import java.util.Map; 035import java.util.Set; 036import java.util.stream.Collectors; 037import java.util.zip.ZipEntry; 038import java.util.zip.ZipFile; 039import org.apache.commons.lang3.StringUtils; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.client.ConnectionFactory; 048import org.apache.hadoop.hbase.client.ConnectionRegistryFactory; 049import org.apache.hadoop.hbase.client.Put; 050import org.apache.hadoop.hbase.client.RegionLocator; 051import org.apache.hadoop.hbase.client.Scan; 052import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 053import org.apache.hadoop.hbase.security.User; 054import org.apache.hadoop.hbase.security.UserProvider; 055import org.apache.hadoop.hbase.security.token.TokenUtil; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.hadoop.hbase.util.IOExceptionRunnable; 058import org.apache.hadoop.hbase.util.IOExceptionSupplier; 059import org.apache.hadoop.hbase.util.RegionSplitter; 060import org.apache.hadoop.hbase.zookeeper.ZKConfig; 061import org.apache.hadoop.io.Writable; 062import org.apache.hadoop.mapreduce.InputFormat; 063import org.apache.hadoop.mapreduce.Job; 064import org.apache.yetus.audience.InterfaceAudience; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 069import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 070 071/** 072 * Utility for {@link TableMapper} and {@link TableReducer} 073 */ 074@SuppressWarnings({ "rawtypes", "unchecked" }) 075@InterfaceAudience.Public 076public class TableMapReduceUtil { 077 private static final Logger LOG = LoggerFactory.getLogger(TableMapReduceUtil.class); 078 public static final String TABLE_INPUT_CLASS_KEY = "hbase.table.input.class"; 079 080 /** 081 * Use this before submitting a TableMap job. It will appropriately set up the job. 082 * @param table The table name to read from. 083 * @param scan The scan instance with the columns, time range etc. 084 * @param mapper The mapper class to use. 085 * @param outputKeyClass The class of the output key. 086 * @param outputValueClass The class of the output value. 087 * @param job The current job to adjust. Make sure the passed job is carrying all 088 * necessary HBase configuration. 089 * @throws IOException When setting up the details fails. 090 */ 091 public static void initTableMapperJob(String table, Scan scan, 092 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 093 Job job) throws IOException { 094 initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job, true); 095 } 096 097 /** 098 * Use this before submitting a TableMap job. It will appropriately set up the job. 099 * @param table The table name to read from. 100 * @param scan The scan instance with the columns, time range etc. 101 * @param mapper The mapper class to use. 102 * @param outputKeyClass The class of the output key. 103 * @param outputValueClass The class of the output value. 104 * @param job The current job to adjust. Make sure the passed job is carrying all 105 * necessary HBase configuration. 106 * @throws IOException When setting up the details fails. 107 */ 108 public static void initTableMapperJob(TableName table, Scan scan, 109 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 110 Job job) throws IOException { 111 initTableMapperJob(table.getNameAsString(), scan, mapper, outputKeyClass, outputValueClass, job, 112 true); 113 } 114 115 /** 116 * Use this before submitting a TableMap job. It will appropriately set up the job. 117 * @param table Binary representation of the table name to read from. 118 * @param scan The scan instance with the columns, time range etc. 119 * @param mapper The mapper class to use. 120 * @param outputKeyClass The class of the output key. 121 * @param outputValueClass The class of the output value. 122 * @param job The current job to adjust. Make sure the passed job is carrying all 123 * necessary HBase configuration. 124 * @throws IOException When setting up the details fails. 125 */ 126 public static void initTableMapperJob(byte[] table, Scan scan, 127 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 128 Job job) throws IOException { 129 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, job, 130 true); 131 } 132 133 /** 134 * Use this before submitting a TableMap job. It will appropriately set up the job. 135 * @param table The table name to read from. 136 * @param scan The scan instance with the columns, time range etc. 137 * @param mapper The mapper class to use. 138 * @param outputKeyClass The class of the output key. 139 * @param outputValueClass The class of the output value. 140 * @param job The current job to adjust. Make sure the passed job is carrying all 141 * necessary HBase configuration. 142 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 143 * the distributed cache (tmpjars). 144 * @throws IOException When setting up the details fails. 145 */ 146 public static void initTableMapperJob(String table, Scan scan, 147 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 148 Job job, boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass) 149 throws IOException { 150 initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job, 151 addDependencyJars, true, inputFormatClass); 152 } 153 154 /** 155 * Use this before submitting a TableMap job. It will appropriately set up the job. 156 * @param table The table name to read from. 157 * @param scan The scan instance with the columns, time range etc. 158 * @param mapper The mapper class to use. 159 * @param outputKeyClass The class of the output key. 160 * @param outputValueClass The class of the output value. 161 * @param job The current job to adjust. Make sure the passed job is carrying all 162 * necessary HBase configuration. 163 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 164 * the distributed cache (tmpjars). 165 * @param initCredentials whether to initialize hbase auth credentials for the job 166 * @param inputFormatClass the input format 167 * @throws IOException When setting up the details fails. 168 */ 169 public static void initTableMapperJob(String table, Scan scan, 170 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 171 Job job, boolean addDependencyJars, boolean initCredentials, 172 Class<? extends InputFormat> inputFormatClass) throws IOException { 173 job.setInputFormatClass(inputFormatClass); 174 if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass); 175 if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass); 176 job.setMapperClass(mapper); 177 if (Put.class.equals(outputValueClass)) { 178 job.setCombinerClass(PutCombiner.class); 179 } 180 Configuration conf = job.getConfiguration(); 181 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); 182 conf.set(TableInputFormat.INPUT_TABLE, table); 183 conf.set(TableInputFormat.SCAN, convertScanToString(scan)); 184 conf.setStrings("io.serializations", conf.get("io.serializations"), 185 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 186 CellSerialization.class.getName()); 187 if (addDependencyJars) { 188 addDependencyJars(job); 189 } 190 if (initCredentials) { 191 initCredentials(job); 192 } 193 } 194 195 /** 196 * Use this before submitting a TableMap job. It will appropriately set up the job. 197 * @param table Binary representation of the table name to read from. 198 * @param scan The scan instance with the columns, time range etc. 199 * @param mapper The mapper class to use. 200 * @param outputKeyClass The class of the output key. 201 * @param outputValueClass The class of the output value. 202 * @param job The current job to adjust. Make sure the passed job is carrying all 203 * necessary HBase configuration. 204 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 205 * the distributed cache (tmpjars). 206 * @param inputFormatClass The class of the input format 207 * @throws IOException When setting up the details fails. 208 */ 209 public static void initTableMapperJob(byte[] table, Scan scan, 210 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 211 Job job, boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass) 212 throws IOException { 213 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, job, 214 addDependencyJars, inputFormatClass); 215 } 216 217 /** 218 * Use this before submitting a TableMap job. It will appropriately set up the job. 219 * @param table Binary representation of the table name to read from. 220 * @param scan The scan instance with the columns, time range etc. 221 * @param mapper The mapper class to use. 222 * @param outputKeyClass The class of the output key. 223 * @param outputValueClass The class of the output value. 224 * @param job The current job to adjust. Make sure the passed job is carrying all 225 * necessary HBase configuration. 226 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 227 * the distributed cache (tmpjars). 228 * @throws IOException When setting up the details fails. 229 */ 230 public static void initTableMapperJob(byte[] table, Scan scan, 231 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 232 Job job, boolean addDependencyJars) throws IOException { 233 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, job, 234 addDependencyJars, getConfiguredInputFormat(job)); 235 } 236 237 /** 238 * @return {@link TableInputFormat} .class unless Configuration has something else at 239 * {@link #TABLE_INPUT_CLASS_KEY}. 240 */ 241 private static Class<? extends InputFormat> getConfiguredInputFormat(Job job) { 242 return (Class<? extends InputFormat>) job.getConfiguration().getClass(TABLE_INPUT_CLASS_KEY, 243 TableInputFormat.class); 244 } 245 246 /** 247 * Use this before submitting a TableMap job. It will appropriately set up the job. 248 * @param table The table name to read from. 249 * @param scan The scan instance with the columns, time range etc. 250 * @param mapper The mapper class to use. 251 * @param outputKeyClass The class of the output key. 252 * @param outputValueClass The class of the output value. 253 * @param job The current job to adjust. Make sure the passed job is carrying all 254 * necessary HBase configuration. 255 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 256 * the distributed cache (tmpjars). 257 * @throws IOException When setting up the details fails. 258 */ 259 public static void initTableMapperJob(String table, Scan scan, 260 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 261 Job job, boolean addDependencyJars) throws IOException { 262 initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job, 263 addDependencyJars, getConfiguredInputFormat(job)); 264 } 265 266 /** 267 * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on direct 268 * memory will likely cause the map tasks to OOM when opening the region. This is done here 269 * instead of in TableSnapshotRegionRecordReader in case an advanced user wants to override this 270 * behavior in their job. 271 */ 272 public static void resetCacheConfig(Configuration conf) { 273 conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); 274 conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f); 275 conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY); 276 } 277 278 /** 279 * Sets up the job for reading from one or more table snapshots, with one or more scans per 280 * snapshot. It bypasses hbase servers and read directly from snapshot files. 281 * @param snapshotScans map of snapshot name to scans on that snapshot. 282 * @param mapper The mapper class to use. 283 * @param outputKeyClass The class of the output key. 284 * @param outputValueClass The class of the output value. 285 * @param job The current job to adjust. Make sure the passed job is carrying all 286 * necessary HBase configuration. 287 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 288 * the distributed cache (tmpjars). 289 */ 290 public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans, 291 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 292 Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException { 293 MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir); 294 295 job.setInputFormatClass(MultiTableSnapshotInputFormat.class); 296 if (outputValueClass != null) { 297 job.setMapOutputValueClass(outputValueClass); 298 } 299 if (outputKeyClass != null) { 300 job.setMapOutputKeyClass(outputKeyClass); 301 } 302 job.setMapperClass(mapper); 303 Configuration conf = job.getConfiguration(); 304 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); 305 306 if (addDependencyJars) { 307 addDependencyJars(job); 308 addDependencyJarsForClasses(job.getConfiguration(), MetricRegistry.class); 309 } 310 311 resetCacheConfig(job.getConfiguration()); 312 } 313 314 /** 315 * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly 316 * from snapshot files. 317 * @param snapshotName The name of the snapshot (of a table) to read from. 318 * @param scan The scan instance with the columns, time range etc. 319 * @param mapper The mapper class to use. 320 * @param outputKeyClass The class of the output key. 321 * @param outputValueClass The class of the output value. 322 * @param job The current job to adjust. Make sure the passed job is carrying all 323 * necessary HBase configuration. 324 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 325 * the distributed cache (tmpjars). 326 * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user 327 * should have write permissions to this directory, and this should not 328 * be a subdirectory of rootdir. After the job is finished, restore 329 * directory can be deleted. 330 * @throws IOException When setting up the details fails. 331 * @see TableSnapshotInputFormat 332 */ 333 public static void initTableSnapshotMapperJob(String snapshotName, Scan scan, 334 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 335 Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException { 336 TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir); 337 initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, outputValueClass, job, 338 addDependencyJars, false, TableSnapshotInputFormat.class); 339 resetCacheConfig(job.getConfiguration()); 340 } 341 342 /** 343 * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly 344 * from snapshot files. 345 * @param snapshotName The name of the snapshot (of a table) to read from. 346 * @param scan The scan instance with the columns, time range etc. 347 * @param mapper The mapper class to use. 348 * @param outputKeyClass The class of the output key. 349 * @param outputValueClass The class of the output value. 350 * @param job The current job to adjust. Make sure the passed job is carrying all 351 * necessary HBase configuration. 352 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 353 * the distributed cache (tmpjars). 354 * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user 355 * should have write permissions to this directory, and this should not 356 * be a subdirectory of rootdir. After the job is finished, restore 357 * directory can be deleted. 358 * @param splitAlgo algorithm to split 359 * @param numSplitsPerRegion how many input splits to generate per one region 360 * @throws IOException When setting up the details fails. 361 * @see TableSnapshotInputFormat 362 */ 363 public static void initTableSnapshotMapperJob(String snapshotName, Scan scan, 364 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 365 Job job, boolean addDependencyJars, Path tmpRestoreDir, RegionSplitter.SplitAlgorithm splitAlgo, 366 int numSplitsPerRegion) throws IOException { 367 TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir, splitAlgo, 368 numSplitsPerRegion); 369 initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, outputValueClass, job, 370 addDependencyJars, false, TableSnapshotInputFormat.class); 371 resetCacheConfig(job.getConfiguration()); 372 } 373 374 /** 375 * Use this before submitting a Multi TableMap job. It will appropriately set up the job. 376 * @param scans The list of {@link Scan} objects to read from. 377 * @param mapper The mapper class to use. 378 * @param outputKeyClass The class of the output key. 379 * @param outputValueClass The class of the output value. 380 * @param job The current job to adjust. Make sure the passed job is carrying all 381 * necessary HBase configuration. 382 * @throws IOException When setting up the details fails. 383 */ 384 public static void initTableMapperJob(List<Scan> scans, Class<? extends TableMapper> mapper, 385 Class<?> outputKeyClass, Class<?> outputValueClass, Job job) throws IOException { 386 initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job, true); 387 } 388 389 /** 390 * Use this before submitting a Multi TableMap job. It will appropriately set up the job. 391 * @param scans The list of {@link Scan} objects to read from. 392 * @param mapper The mapper class to use. 393 * @param outputKeyClass The class of the output key. 394 * @param outputValueClass The class of the output value. 395 * @param job The current job to adjust. Make sure the passed job is carrying all 396 * necessary HBase configuration. 397 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 398 * the distributed cache (tmpjars). 399 * @throws IOException When setting up the details fails. 400 */ 401 public static void initTableMapperJob(List<Scan> scans, Class<? extends TableMapper> mapper, 402 Class<?> outputKeyClass, Class<?> outputValueClass, Job job, boolean addDependencyJars) 403 throws IOException { 404 initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job, addDependencyJars, 405 true); 406 } 407 408 /** 409 * Use this before submitting a Multi TableMap job. It will appropriately set up the job. 410 * @param scans The list of {@link Scan} objects to read from. 411 * @param mapper The mapper class to use. 412 * @param outputKeyClass The class of the output key. 413 * @param outputValueClass The class of the output value. 414 * @param job The current job to adjust. Make sure the passed job is carrying all 415 * necessary HBase configuration. 416 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 417 * the distributed cache (tmpjars). 418 * @param initCredentials whether to initialize hbase auth credentials for the job 419 * @throws IOException When setting up the details fails. 420 */ 421 public static void initTableMapperJob(List<Scan> scans, Class<? extends TableMapper> mapper, 422 Class<?> outputKeyClass, Class<?> outputValueClass, Job job, boolean addDependencyJars, 423 boolean initCredentials) throws IOException { 424 job.setInputFormatClass(MultiTableInputFormat.class); 425 if (outputValueClass != null) { 426 job.setMapOutputValueClass(outputValueClass); 427 } 428 if (outputKeyClass != null) { 429 job.setMapOutputKeyClass(outputKeyClass); 430 } 431 job.setMapperClass(mapper); 432 Configuration conf = job.getConfiguration(); 433 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); 434 List<String> scanStrings = new ArrayList<>(); 435 436 for (Scan scan : scans) { 437 scanStrings.add(convertScanToString(scan)); 438 } 439 job.getConfiguration().setStrings(MultiTableInputFormat.SCANS, 440 scanStrings.toArray(new String[scanStrings.size()])); 441 442 if (addDependencyJars) { 443 addDependencyJars(job); 444 } 445 446 if (initCredentials) { 447 initCredentials(job); 448 } 449 } 450 451 private static void addTokenForJob(IOExceptionSupplier<Connection> connSupplier, User user, 452 Job job) throws IOException, InterruptedException { 453 try (Connection conn = connSupplier.get()) { 454 TokenUtil.addTokenForJob(conn, user, job); 455 } 456 } 457 458 public static void initCredentials(Job job) throws IOException { 459 UserProvider userProvider = UserProvider.instantiate(job.getConfiguration()); 460 if (userProvider.isHadoopSecurityEnabled()) { 461 // propagate delegation related props from launcher job to MR job 462 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { 463 job.getConfiguration().set("mapreduce.job.credentials.binary", 464 System.getenv("HADOOP_TOKEN_FILE_LOCATION")); 465 } 466 } 467 468 if (userProvider.isHBaseSecurityEnabled()) { 469 User user = userProvider.getCurrent(); 470 try { 471 // init credentials for remote cluster 472 String outputCluster = job.getConfiguration().get(TableOutputFormat.OUTPUT_CLUSTER); 473 if (!StringUtils.isBlank(outputCluster)) { 474 addTokenForJob(() -> { 475 URI uri; 476 try { 477 uri = new URI(outputCluster); 478 } catch (URISyntaxException e) { 479 throw new IOException("malformed connection uri: " + outputCluster 480 + ", please check config " + TableOutputFormat.OUTPUT_CLUSTER, e); 481 } 482 return ConnectionFactory.createConnection(uri, job.getConfiguration()); 483 }, user, job); 484 } 485 String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS); 486 if (!StringUtils.isBlank(quorumAddress)) { 487 addTokenForJob(() -> { 488 Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(), 489 quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX); 490 return ConnectionFactory.createConnection(peerConf, user); 491 }, user, job); 492 } 493 // init credentials for source cluster 494 addTokenForJob(() -> ConnectionFactory.createConnection(job.getConfiguration()), user, job); 495 } catch (InterruptedException ie) { 496 LOG.info("Interrupted obtaining user authentication token"); 497 Thread.currentThread().interrupt(); 498 } 499 } 500 } 501 502 /** 503 * Obtain an authentication token, for the specified cluster, on behalf of the current user and 504 * add it to the credentials for the given map reduce job. 505 * @param job The job that requires the permission. 506 * @param conf The configuration to use in connecting to the peer cluster 507 * @throws IOException When the authentication token cannot be obtained. 508 */ 509 public static void initCredentialsForCluster(Job job, Configuration conf) throws IOException { 510 initCredentialsForCluster(job, conf, null); 511 } 512 513 /** 514 * Obtain an authentication token, for the specified cluster, on behalf of the current user and 515 * add it to the credentials for the given map reduce job. 516 * @param job The job that requires the permission. 517 * @param conf The configuration to use in connecting to the peer cluster 518 * @param uri The connection uri for the given peer cluster 519 * @throws IOException When the authentication token cannot be obtained. 520 */ 521 public static void initCredentialsForCluster(Job job, Configuration conf, URI uri) 522 throws IOException { 523 UserProvider userProvider = UserProvider.instantiate(conf); 524 if (userProvider.isHBaseSecurityEnabled()) { 525 try { 526 addTokenForJob(() -> ConnectionFactory.createConnection(uri, conf), 527 userProvider.getCurrent(), job); 528 } catch (InterruptedException e) { 529 LOG.info("Interrupted obtaining user authentication token"); 530 Thread.interrupted(); 531 } 532 } 533 } 534 535 /** 536 * Writes the given scan into a Base64 encoded string. 537 * @param scan The scan to write out. 538 * @return The scan saved in a Base64 encoded string. 539 * @throws IOException When writing the scan fails. 540 */ 541 public static String convertScanToString(Scan scan) throws IOException { 542 ClientProtos.Scan proto = ProtobufUtil.toScan(scan); 543 return Bytes.toString(Base64.getEncoder().encode(proto.toByteArray())); 544 } 545 546 /** 547 * Converts the given Base64 string back into a Scan instance. 548 * @param base64 The scan details. 549 * @return The newly created Scan instance. 550 * @throws IOException When reading the scan instance fails. 551 */ 552 public static Scan convertStringToScan(String base64) throws IOException { 553 byte[] decoded = Base64.getDecoder().decode(base64); 554 return ProtobufUtil.toScan(ClientProtos.Scan.parseFrom(decoded)); 555 } 556 557 /** 558 * Use this before submitting a TableReduce job. It will appropriately set up the JobConf. 559 * @param table The output table. 560 * @param reducer The reducer class to use. 561 * @param job The current job to adjust. 562 * @throws IOException When determining the region count fails. 563 */ 564 public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer, 565 Job job) throws IOException { 566 initTableReducerJob(table, reducer, job, null); 567 } 568 569 /** 570 * Use this before submitting a TableReduce job. It will appropriately set up the JobConf. 571 * @param table The output table. 572 * @param reducer The reducer class to use. 573 * @param job The current job to adjust. 574 * @param partitioner Partitioner to use. Pass <code>null</code> to use default partitioner. 575 * @throws IOException When determining the region count fails. 576 */ 577 public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer, 578 Job job, Class partitioner) throws IOException { 579 initTableReducerJob(table, reducer, job, partitioner, (URI) null); 580 } 581 582 /** 583 * Use this before submitting a TableReduce job. It will appropriately set up the JobConf. 584 * @param table The output table. 585 * @param reducer The reducer class to use. 586 * @param job The current job to adjust. Make sure the passed job is carrying all 587 * necessary HBase configuration. 588 * @param partitioner Partitioner to use. Pass <code>null</code> to use default partitioner. 589 * @param quorumAddress Distant cluster to write to; default is null for output to the cluster 590 * that is designated in <code>hbase-site.xml</code>. Set this String to the 591 * zookeeper ensemble of an alternate remote cluster when you would have the 592 * reduce write a cluster that is other than the default; e.g. copying tables 593 * between clusters, the source would be designated by 594 * <code>hbase-site.xml</code> and this param would have the ensemble address 595 * of the remote cluster. The format to pass is particular. Pass 596 * <code> <hbase.zookeeper.quorum>:< 597 * hbase.zookeeper.client.port>:<zookeeper.znode.parent> 598 * </code> such as <code>server,server2,server3:2181:/hbase</code>. 599 * @throws IOException When determining the region count fails. 600 * @deprecated Since 3.0.0, will be removed in 4.0.0. Use 601 * {@link #initTableReducerJob(String, Class, Job, Class, URI)} instead, where we use 602 * the connection uri to specify the target cluster. 603 */ 604 @Deprecated 605 public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer, 606 Job job, Class partitioner, String quorumAddress) throws IOException { 607 initTableReducerJob(table, reducer, job, partitioner, quorumAddress, true); 608 } 609 610 /** 611 * Use this before submitting a TableReduce job. It will appropriately set up the JobConf. 612 * @param table The output table. 613 * @param reducer The reducer class to use. 614 * @param job The current job to adjust. Make sure the passed job is carrying all 615 * necessary HBase configuration. 616 * @param partitioner Partitioner to use. Pass <code>null</code> to use default partitioner. 617 * @param quorumAddress Distant cluster to write to; default is null for output to the cluster 618 * that is designated in <code>hbase-site.xml</code>. Set this String to 619 * the zookeeper ensemble of an alternate remote cluster when you would 620 * have the reduce write a cluster that is other than the default; e.g. 621 * copying tables between clusters, the source would be designated by 622 * <code>hbase-site.xml</code> and this param would have the ensemble 623 * address of the remote cluster. The format to pass is particular. Pass 624 * <code> <hbase.zookeeper.quorum>:< 625 * hbase.zookeeper.client.port>:<zookeeper.znode.parent> 626 * </code> such as <code>server,server2,server3:2181:/hbase</code>. 627 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 628 * the distributed cache (tmpjars). 629 * @throws IOException When determining the region count fails. 630 * @deprecated Since 3.0.0, will be removed in 4.0.0. Use 631 * {@link #initTableReducerJob(String, Class, Job, Class, URI, boolean)} instead, 632 * where we use the connection uri to specify the target cluster. 633 */ 634 @Deprecated 635 public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer, 636 Job job, Class partitioner, String quorumAddress, boolean addDependencyJars) 637 throws IOException { 638 initTableReducerJob(table, reducer, job, partitioner, () -> { 639 // If passed a quorum/ensemble address, pass it on to TableOutputFormat. 640 if (quorumAddress != null) { 641 // Calling this will validate the format 642 ZKConfig.validateClusterKey(quorumAddress); 643 job.getConfiguration().set(TableOutputFormat.QUORUM_ADDRESS, quorumAddress); 644 } 645 }, addDependencyJars); 646 } 647 648 /** 649 * Use this before submitting a TableReduce job. It will appropriately set up the JobConf. 650 * @param table The output table. 651 * @param reducer The reducer class to use. 652 * @param job The current job to adjust. Make sure the passed job is carrying all 653 * necessary HBase configuration. 654 * @param partitioner Partitioner to use. Pass <code>null</code> to use default partitioner. 655 * @param outputCluster The HBase cluster you want to write to. Default is null which means output 656 * to the same cluster you read from, i.e, the cluster when initializing by 657 * the job's Configuration instance. 658 * @throws IOException When determining the region count fails. 659 */ 660 public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer, 661 Job job, Class partitioner, URI outputCluster) throws IOException { 662 initTableReducerJob(table, reducer, job, partitioner, outputCluster, true); 663 } 664 665 /** 666 * Use this before submitting a TableReduce job. It will appropriately set up the JobConf. 667 * @param table The output table. 668 * @param reducer The reducer class to use. 669 * @param job The current job to adjust. Make sure the passed job is carrying all 670 * necessary HBase configuration. 671 * @param partitioner Partitioner to use. Pass <code>null</code> to use default partitioner. 672 * @param outputCluster The HBase cluster you want to write to. Default is null which means 673 * output to the same cluster you read from, i.e, the cluster when 674 * initializing by the job's Configuration instance. 675 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 676 * the distributed cache (tmpjars). 677 * @throws IOException When determining the region count fails. 678 */ 679 public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer, 680 Job job, Class partitioner, URI outputCluster, boolean addDependencyJars) throws IOException { 681 initTableReducerJob(table, reducer, job, partitioner, () -> { 682 if (outputCluster != null) { 683 ConnectionRegistryFactory.validate(outputCluster); 684 job.getConfiguration().set(TableOutputFormat.OUTPUT_CLUSTER, outputCluster.toString()); 685 } 686 }, addDependencyJars); 687 } 688 689 private static void initTableReducerJob(String table, Class<? extends TableReducer> reducer, 690 Job job, Class partitioner, IOExceptionRunnable setOutputCluster, boolean addDependencyJars) 691 throws IOException { 692 Configuration conf = job.getConfiguration(); 693 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); 694 job.setOutputFormatClass(TableOutputFormat.class); 695 if (reducer != null) { 696 job.setReducerClass(reducer); 697 } 698 conf.set(TableOutputFormat.OUTPUT_TABLE, table); 699 conf.setStrings("io.serializations", conf.get("io.serializations"), 700 MutationSerialization.class.getName(), ResultSerialization.class.getName()); 701 setOutputCluster.run(); 702 job.setOutputKeyClass(ImmutableBytesWritable.class); 703 job.setOutputValueClass(Writable.class); 704 if (partitioner == HRegionPartitioner.class) { 705 job.setPartitionerClass(HRegionPartitioner.class); 706 int regions = getRegionCount(conf, TableName.valueOf(table)); 707 if (job.getNumReduceTasks() > regions) { 708 job.setNumReduceTasks(regions); 709 } 710 } else if (partitioner != null) { 711 job.setPartitionerClass(partitioner); 712 } 713 714 if (addDependencyJars) { 715 addDependencyJars(job); 716 } 717 718 initCredentials(job); 719 } 720 721 /** 722 * Use this before submitting a TableReduce job. It will appropriately set up the JobConf. 723 * @param table The output table. 724 * @param reducer The reducer class to use. 725 * @param job The current job to adjust. Make sure the passed job is carrying all 726 * necessary HBase configuration. 727 * @param partitioner Partitioner to use. Pass <code>null</code> to use default partitioner. 728 * @param quorumAddress Distant cluster to write to; default is null for output to the cluster 729 * that is designated in <code>hbase-site.xml</code>. Set this String to the 730 * zookeeper ensemble of an alternate remote cluster when you would have the 731 * reduce write a cluster that is other than the default; e.g. copying tables 732 * between clusters, the source would be designated by 733 * <code>hbase-site.xml</code> and this param would have the ensemble address 734 * of the remote cluster. The format to pass is particular. Pass 735 * <code> <hbase.zookeeper.quorum>:< 736 * hbase.zookeeper.client.port>:<zookeeper.znode.parent> 737 * </code> such as <code>server,server2,server3:2181:/hbase</code>. 738 * @param serverClass redefined hbase.regionserver.class 739 * @param serverImpl redefined hbase.regionserver.impl 740 * @throws IOException When determining the region count fails. 741 * @deprecated Since 2.5.9, 2.6.1, 2.7.0, will be removed in 4.0.0. The {@code serverClass} and 742 * {@code serverImpl} do not take effect any more, just use 743 * {@link #initTableReducerJob(String, Class, Job, Class, String)} instead. 744 * @see #initTableReducerJob(String, Class, Job, Class, String) 745 */ 746 @Deprecated 747 public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer, 748 Job job, Class partitioner, String quorumAddress, String serverClass, String serverImpl) 749 throws IOException { 750 initTableReducerJob(table, reducer, job, partitioner, quorumAddress); 751 } 752 753 /** 754 * Use this before submitting a TableReduce job. It will appropriately set up the JobConf. 755 * @param table The output table. 756 * @param reducer The reducer class to use. 757 * @param job The current job to adjust. Make sure the passed job is carrying all 758 * necessary HBase configuration. 759 * @param partitioner Partitioner to use. Pass <code>null</code> to use default partitioner. 760 * @param quorumAddress Distant cluster to write to; default is null for output to the cluster 761 * that is designated in <code>hbase-site.xml</code>. Set this String to 762 * the zookeeper ensemble of an alternate remote cluster when you would 763 * have the reduce write a cluster that is other than the default; e.g. 764 * copying tables between clusters, the source would be designated by 765 * <code>hbase-site.xml</code> and this param would have the ensemble 766 * address of the remote cluster. The format to pass is particular. Pass 767 * <code> <hbase.zookeeper.quorum>:< 768 * hbase.zookeeper.client.port>:<zookeeper.znode.parent> 769 * </code> such as <code>server,server2,server3:2181:/hbase</code>. 770 * @param serverClass redefined hbase.regionserver.class 771 * @param serverImpl redefined hbase.regionserver.impl 772 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 773 * the distributed cache (tmpjars). 774 * @throws IOException When determining the region count fails. 775 * @deprecated Since 2.5.9, 2.6.1, 2.7.0, will be removed in 4.0.0. The {@code serverClass} and 776 * {@code serverImpl} do not take effect any more, just use 777 * {@link #initTableReducerJob(String, Class, Job, Class, String, boolean)} instead. 778 * @see #initTableReducerJob(String, Class, Job, Class, String, boolean) 779 */ 780 @Deprecated 781 public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer, 782 Job job, Class partitioner, String quorumAddress, String serverClass, String serverImpl, 783 boolean addDependencyJars) throws IOException { 784 initTableReducerJob(table, reducer, job, partitioner, quorumAddress, addDependencyJars); 785 } 786 787 /** 788 * Ensures that the given number of reduce tasks for the given job configuration does not exceed 789 * the number of regions for the given table. 790 * @param table The table to get the region count for. 791 * @param job The current job to adjust. 792 * @throws IOException When retrieving the table details fails. 793 */ 794 public static void limitNumReduceTasks(String table, Job job) throws IOException { 795 int regions = getRegionCount(job.getConfiguration(), TableName.valueOf(table)); 796 if (job.getNumReduceTasks() > regions) { 797 job.setNumReduceTasks(regions); 798 } 799 } 800 801 /** 802 * Sets the number of reduce tasks for the given job configuration to the number of regions the 803 * given table has. 804 * @param table The table to get the region count for. 805 * @param job The current job to adjust. 806 * @throws IOException When retrieving the table details fails. 807 */ 808 public static void setNumReduceTasks(String table, Job job) throws IOException { 809 job.setNumReduceTasks(getRegionCount(job.getConfiguration(), TableName.valueOf(table))); 810 } 811 812 /** 813 * Sets the number of rows to return and cache with each scanner iteration. Higher caching values 814 * will enable faster mapreduce jobs at the expense of requiring more heap to contain the cached 815 * rows. 816 * @param job The current job to adjust. 817 * @param batchSize The number of rows to return in batch with each scanner iteration. 818 */ 819 public static void setScannerCaching(Job job, int batchSize) { 820 job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize); 821 } 822 823 /** 824 * Add HBase and its dependencies (only) to the job configuration. 825 * <p> 826 * This is intended as a low-level API, facilitating code reuse between this class and its mapred 827 * counterpart. It also of use to external tools that need to build a MapReduce job that interacts 828 * with HBase but want fine-grained control over the jars shipped to the cluster. 829 * </p> 830 * @param conf The Configuration object to extend with dependencies. 831 * @see org.apache.hadoop.hbase.mapred.TableMapReduceUtil 832 * @see <a href="https://issues.apache.org/jira/browse/PIG-3285">PIG-3285</a> 833 */ 834 public static void addHBaseDependencyJars(Configuration conf) throws IOException { 835 addDependencyJarsForClasses(conf, 836 // explicitly pull a class from each module 837 org.apache.hadoop.hbase.HConstants.class, // hbase-common 838 org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.class, // hbase-protocol-shaded 839 org.apache.hadoop.hbase.client.Put.class, // hbase-client 840 org.apache.hadoop.hbase.ipc.RpcServer.class, // hbase-server 841 org.apache.hadoop.hbase.CompatibilityFactory.class, // hbase-hadoop-compat 842 org.apache.hadoop.hbase.mapreduce.JobUtil.class, // hbase-hadoop2-compat 843 org.apache.hadoop.hbase.mapreduce.TableMapper.class, // hbase-mapreduce 844 org.apache.hadoop.hbase.metrics.impl.FastLongHistogram.class, // hbase-metrics 845 org.apache.hadoop.hbase.metrics.Snapshot.class, // hbase-metrics-api 846 org.apache.hadoop.hbase.replication.ReplicationUtils.class, // hbase-replication 847 org.apache.hadoop.hbase.http.HttpServer.class, // hbase-http 848 org.apache.hadoop.hbase.procedure2.Procedure.class, // hbase-procedure 849 org.apache.hadoop.hbase.zookeeper.ZKWatcher.class, // hbase-zookeeper 850 org.apache.hbase.thirdparty.com.google.common.collect.Lists.class, // hb-shaded-miscellaneous 851 org.apache.hbase.thirdparty.com.google.gson.GsonBuilder.class, // hbase-shaded-gson 852 org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations.class, // hb-sh-protobuf 853 org.apache.hbase.thirdparty.io.netty.channel.Channel.class, // hbase-shaded-netty 854 org.apache.hadoop.hbase.unsafe.HBasePlatformDependent.class, // hbase-unsafe 855 org.apache.zookeeper.ZooKeeper.class, // zookeeper 856 com.codahale.metrics.MetricRegistry.class, // metrics-core 857 org.apache.commons.lang3.ArrayUtils.class, // commons-lang 858 io.opentelemetry.api.trace.Span.class, // opentelemetry-api 859 io.opentelemetry.semconv.trace.attributes.SemanticAttributes.class, // opentelemetry-semconv 860 io.opentelemetry.context.Context.class); // opentelemetry-context 861 } 862 863 /** 864 * Returns a classpath string built from the content of the "tmpjars" value in {@code conf}. Also 865 * exposed to shell scripts via `bin/hbase mapredcp`. 866 */ 867 public static String buildDependencyClasspath(Configuration conf) { 868 if (conf == null) { 869 throw new IllegalArgumentException("Must provide a configuration object."); 870 } 871 Set<String> paths = new HashSet<>(conf.getStringCollection("tmpjars")); 872 if (paths.isEmpty()) { 873 throw new IllegalArgumentException("Configuration contains no tmpjars."); 874 } 875 StringBuilder sb = new StringBuilder(); 876 for (String s : paths) { 877 // entries can take the form 'file:/path/to/file.jar'. 878 int idx = s.indexOf(":"); 879 if (idx != -1) s = s.substring(idx + 1); 880 if (sb.length() > 0) sb.append(File.pathSeparator); 881 sb.append(s); 882 } 883 return sb.toString(); 884 } 885 886 /** 887 * Add the HBase dependency jars as well as jars for any of the configured job classes to the job 888 * configuration, so that JobClient will ship them to the cluster and add them to the 889 * DistributedCache. 890 */ 891 public static void addDependencyJars(Job job) throws IOException { 892 addHBaseDependencyJars(job.getConfiguration()); 893 try { 894 addDependencyJarsForClasses(job.getConfiguration(), 895 // when making changes here, consider also mapred.TableMapReduceUtil 896 // pull job classes 897 job.getMapOutputKeyClass(), job.getMapOutputValueClass(), job.getInputFormatClass(), 898 job.getOutputKeyClass(), job.getOutputValueClass(), job.getOutputFormatClass(), 899 job.getPartitionerClass(), job.getCombinerClass()); 900 } catch (ClassNotFoundException e) { 901 throw new IOException(e); 902 } 903 } 904 905 /** 906 * Add the jars containing the given classes to the job's configuration such that JobClient will 907 * ship them to the cluster and add them to the DistributedCache. N.B. that this method at most 908 * adds one jar per class given. If there is more than one jar available containing a class with 909 * the same name as a given class, we don't define which of those jars might be chosen. 910 * @param conf The Hadoop Configuration to modify 911 * @param classes will add just those dependencies needed to find the given classes 912 * @throws IOException if an underlying library call fails. 913 */ 914 @InterfaceAudience.Private 915 public static void addDependencyJarsForClasses(Configuration conf, Class<?>... classes) 916 throws IOException { 917 918 FileSystem localFs = FileSystem.getLocal(conf); 919 Set<String> jars = new HashSet<>(); 920 // Add jars that are already in the tmpjars variable 921 jars.addAll(conf.getStringCollection("tmpjars")); 922 923 // add jars as we find them to a map of contents jar name so that we can avoid 924 // creating new jars for classes that have already been packaged. 925 Map<String, String> packagedClasses = new HashMap<>(); 926 927 // Add jars containing the specified classes 928 for (Class<?> clazz : classes) { 929 if (clazz == null) continue; 930 931 Path path = findOrCreateJar(clazz, localFs, packagedClasses); 932 if (path == null) { 933 LOG.warn("Could not find jar for class " + clazz + " in order to ship it to the cluster."); 934 continue; 935 } 936 if (!localFs.exists(path)) { 937 LOG.warn("Could not validate jar file " + path + " for class " + clazz); 938 continue; 939 } 940 jars.add(path.toString()); 941 } 942 if (jars.isEmpty()) { 943 return; 944 } 945 conf.set("tmpjars", jars.stream().collect(Collectors.joining(","))); 946 } 947 948 /** 949 * Finds the Jar for a class or creates it if it doesn't exist. If the class is in a directory in 950 * the classpath, it creates a Jar on the fly with the contents of the directory and returns the 951 * path to that Jar. If a Jar is created, it is created in the system temporary directory. 952 * Otherwise, returns an existing jar that contains a class of the same name. Maintains a mapping 953 * from jar contents to the tmp jar created. 954 * @param my_class the class to find. 955 * @param fs the FileSystem with which to qualify the returned path. 956 * @param packagedClasses a map of class name to path. 957 * @return a jar file that contains the class. 958 */ 959 private static Path findOrCreateJar(Class<?> my_class, FileSystem fs, 960 Map<String, String> packagedClasses) throws IOException { 961 // attempt to locate an existing jar for the class. 962 String jar = findContainingJar(my_class, packagedClasses); 963 if (null == jar || jar.isEmpty()) { 964 jar = getJar(my_class); 965 updateMap(jar, packagedClasses); 966 } 967 968 if (null == jar || jar.isEmpty()) { 969 return null; 970 } 971 972 LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar)); 973 return new Path(jar).makeQualified(fs.getUri(), fs.getWorkingDirectory()); 974 } 975 976 /** 977 * Add entries to <code>packagedClasses</code> corresponding to class files contained in 978 * <code>jar</code>. 979 * @param jar The jar who's content to list. 980 * @param packagedClasses map[class -> jar] 981 */ 982 private static void updateMap(String jar, Map<String, String> packagedClasses) 983 throws IOException { 984 if (null == jar || jar.isEmpty()) { 985 return; 986 } 987 ZipFile zip = null; 988 try { 989 zip = new ZipFile(jar); 990 for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) { 991 ZipEntry entry = iter.nextElement(); 992 if (entry.getName().endsWith("class")) { 993 packagedClasses.put(entry.getName(), jar); 994 } 995 } 996 } finally { 997 if (null != zip) zip.close(); 998 } 999 } 1000 1001 /** 1002 * Find a jar that contains a class of the same name, if any. It will return a jar file, even if 1003 * that is not the first thing on the class path that has a class with the same name. Looks first 1004 * on the classpath and then in the <code>packagedClasses</code> map. 1005 * @param my_class the class to find. 1006 * @return a jar file that contains the class, or null. 1007 */ 1008 private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses) 1009 throws IOException { 1010 ClassLoader loader = my_class.getClassLoader(); 1011 1012 String class_file = my_class.getName().replaceAll("\\.", "/") + ".class"; 1013 1014 if (loader != null) { 1015 // first search the classpath 1016 for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) { 1017 URL url = itr.nextElement(); 1018 if ("jar".equals(url.getProtocol())) { 1019 String toReturn = url.getPath(); 1020 if (toReturn.startsWith("file:")) { 1021 toReturn = toReturn.substring("file:".length()); 1022 } 1023 // URLDecoder is a misnamed class, since it actually decodes 1024 // x-www-form-urlencoded MIME type rather than actual 1025 // URL encoding (which the file path has). Therefore it would 1026 // decode +s to ' 's which is incorrect (spaces are actually 1027 // either unencoded or encoded as "%20"). Replace +s first, so 1028 // that they are kept sacred during the decoding process. 1029 toReturn = toReturn.replaceAll("\\+", "%2B"); 1030 toReturn = URLDecoder.decode(toReturn, "UTF-8"); 1031 return toReturn.replaceAll("!.*$", ""); 1032 } 1033 } 1034 } 1035 1036 // now look in any jars we've packaged using JarFinder. Returns null when 1037 // no jar is found. 1038 return packagedClasses.get(class_file); 1039 } 1040 1041 /** 1042 * Invoke 'getJar' on a custom JarFinder implementation. Useful for some job configuration 1043 * contexts (HBASE-8140) and also for testing on MRv2. check if we have HADOOP-9426. 1044 * @param my_class the class to find. 1045 * @return a jar file that contains the class, or null. 1046 */ 1047 private static String getJar(Class<?> my_class) { 1048 String ret = null; 1049 try { 1050 ret = JarFinder.getJar(my_class); 1051 } catch (Exception e) { 1052 // toss all other exceptions, related to reflection failure 1053 throw new RuntimeException("getJar invocation failed.", e); 1054 } 1055 1056 return ret; 1057 } 1058 1059 private static int getRegionCount(Configuration conf, TableName tableName) throws IOException { 1060 try (Connection conn = ConnectionFactory.createConnection(conf); 1061 RegionLocator locator = conn.getRegionLocator(tableName)) { 1062 return locator.getAllRegionLocations().size(); 1063 } 1064 } 1065}