001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import com.codahale.metrics.MetricRegistry;
021import java.io.File;
022import java.io.IOException;
023import java.net.URI;
024import java.net.URISyntaxException;
025import java.net.URL;
026import java.net.URLDecoder;
027import java.util.ArrayList;
028import java.util.Base64;
029import java.util.Collection;
030import java.util.Enumeration;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036import java.util.stream.Collectors;
037import java.util.zip.ZipEntry;
038import java.util.zip.ZipFile;
039import org.apache.commons.lang3.StringUtils;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.client.Connection;
047import org.apache.hadoop.hbase.client.ConnectionFactory;
048import org.apache.hadoop.hbase.client.ConnectionRegistryFactory;
049import org.apache.hadoop.hbase.client.Put;
050import org.apache.hadoop.hbase.client.RegionLocator;
051import org.apache.hadoop.hbase.client.Scan;
052import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
053import org.apache.hadoop.hbase.security.User;
054import org.apache.hadoop.hbase.security.UserProvider;
055import org.apache.hadoop.hbase.security.token.TokenUtil;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.util.IOExceptionRunnable;
058import org.apache.hadoop.hbase.util.IOExceptionSupplier;
059import org.apache.hadoop.hbase.util.RegionSplitter;
060import org.apache.hadoop.hbase.zookeeper.ZKConfig;
061import org.apache.hadoop.io.Writable;
062import org.apache.hadoop.mapreduce.InputFormat;
063import org.apache.hadoop.mapreduce.Job;
064import org.apache.yetus.audience.InterfaceAudience;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
070
071/**
072 * Utility for {@link TableMapper} and {@link TableReducer}
073 */
074@SuppressWarnings({ "rawtypes", "unchecked" })
075@InterfaceAudience.Public
076public class TableMapReduceUtil {
077  private static final Logger LOG = LoggerFactory.getLogger(TableMapReduceUtil.class);
078  public static final String TABLE_INPUT_CLASS_KEY = "hbase.table.input.class";
079
080  /**
081   * Use this before submitting a TableMap job. It will appropriately set up the job.
082   * @param table            The table name to read from.
083   * @param scan             The scan instance with the columns, time range etc.
084   * @param mapper           The mapper class to use.
085   * @param outputKeyClass   The class of the output key.
086   * @param outputValueClass The class of the output value.
087   * @param job              The current job to adjust. Make sure the passed job is carrying all
088   *                         necessary HBase configuration.
089   * @throws IOException When setting up the details fails.
090   */
091  public static void initTableMapperJob(String table, Scan scan,
092    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
093    Job job) throws IOException {
094    initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job, true);
095  }
096
097  /**
098   * Use this before submitting a TableMap job. It will appropriately set up the job.
099   * @param table            The table name to read from.
100   * @param scan             The scan instance with the columns, time range etc.
101   * @param mapper           The mapper class to use.
102   * @param outputKeyClass   The class of the output key.
103   * @param outputValueClass The class of the output value.
104   * @param job              The current job to adjust. Make sure the passed job is carrying all
105   *                         necessary HBase configuration.
106   * @throws IOException When setting up the details fails.
107   */
108  public static void initTableMapperJob(TableName table, Scan scan,
109    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
110    Job job) throws IOException {
111    initTableMapperJob(table.getNameAsString(), scan, mapper, outputKeyClass, outputValueClass, job,
112      true);
113  }
114
115  /**
116   * Use this before submitting a TableMap job. It will appropriately set up the job.
117   * @param table            Binary representation of the table name to read from.
118   * @param scan             The scan instance with the columns, time range etc.
119   * @param mapper           The mapper class to use.
120   * @param outputKeyClass   The class of the output key.
121   * @param outputValueClass The class of the output value.
122   * @param job              The current job to adjust. Make sure the passed job is carrying all
123   *                         necessary HBase configuration.
124   * @throws IOException When setting up the details fails.
125   */
126  public static void initTableMapperJob(byte[] table, Scan scan,
127    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
128    Job job) throws IOException {
129    initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, job,
130      true);
131  }
132
133  /**
134   * Use this before submitting a TableMap job. It will appropriately set up the job.
135   * @param table             The table name to read from.
136   * @param scan              The scan instance with the columns, time range etc.
137   * @param mapper            The mapper class to use.
138   * @param outputKeyClass    The class of the output key.
139   * @param outputValueClass  The class of the output value.
140   * @param job               The current job to adjust. Make sure the passed job is carrying all
141   *                          necessary HBase configuration.
142   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
143   *                          the distributed cache (tmpjars).
144   * @throws IOException When setting up the details fails.
145   */
146  public static void initTableMapperJob(String table, Scan scan,
147    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
148    Job job, boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
149    throws IOException {
150    initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job,
151      addDependencyJars, true, inputFormatClass);
152  }
153
154  /**
155   * Use this before submitting a TableMap job. It will appropriately set up the job.
156   * @param table             The table name to read from.
157   * @param scan              The scan instance with the columns, time range etc.
158   * @param mapper            The mapper class to use.
159   * @param outputKeyClass    The class of the output key.
160   * @param outputValueClass  The class of the output value.
161   * @param job               The current job to adjust. Make sure the passed job is carrying all
162   *                          necessary HBase configuration.
163   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
164   *                          the distributed cache (tmpjars).
165   * @param initCredentials   whether to initialize hbase auth credentials for the job
166   * @param inputFormatClass  the input format
167   * @throws IOException When setting up the details fails.
168   */
169  public static void initTableMapperJob(String table, Scan scan,
170    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
171    Job job, boolean addDependencyJars, boolean initCredentials,
172    Class<? extends InputFormat> inputFormatClass) throws IOException {
173    job.setInputFormatClass(inputFormatClass);
174    if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
175    if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass);
176    job.setMapperClass(mapper);
177    if (Put.class.equals(outputValueClass)) {
178      job.setCombinerClass(PutCombiner.class);
179    }
180    Configuration conf = job.getConfiguration();
181    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
182    conf.set(TableInputFormat.INPUT_TABLE, table);
183    conf.set(TableInputFormat.SCAN, convertScanToString(scan));
184    conf.setStrings("io.serializations", conf.get("io.serializations"),
185      MutationSerialization.class.getName(), ResultSerialization.class.getName(),
186      CellSerialization.class.getName());
187    if (addDependencyJars) {
188      addDependencyJars(job);
189    }
190    if (initCredentials) {
191      initCredentials(job);
192    }
193  }
194
195  /**
196   * Use this before submitting a TableMap job. It will appropriately set up the job.
197   * @param table             Binary representation of the table name to read from.
198   * @param scan              The scan instance with the columns, time range etc.
199   * @param mapper            The mapper class to use.
200   * @param outputKeyClass    The class of the output key.
201   * @param outputValueClass  The class of the output value.
202   * @param job               The current job to adjust. Make sure the passed job is carrying all
203   *                          necessary HBase configuration.
204   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
205   *                          the distributed cache (tmpjars).
206   * @param inputFormatClass  The class of the input format
207   * @throws IOException When setting up the details fails.
208   */
209  public static void initTableMapperJob(byte[] table, Scan scan,
210    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
211    Job job, boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
212    throws IOException {
213    initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, job,
214      addDependencyJars, inputFormatClass);
215  }
216
217  /**
218   * Use this before submitting a TableMap job. It will appropriately set up the job.
219   * @param table             Binary representation of the table name to read from.
220   * @param scan              The scan instance with the columns, time range etc.
221   * @param mapper            The mapper class to use.
222   * @param outputKeyClass    The class of the output key.
223   * @param outputValueClass  The class of the output value.
224   * @param job               The current job to adjust. Make sure the passed job is carrying all
225   *                          necessary HBase configuration.
226   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
227   *                          the distributed cache (tmpjars).
228   * @throws IOException When setting up the details fails.
229   */
230  public static void initTableMapperJob(byte[] table, Scan scan,
231    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
232    Job job, boolean addDependencyJars) throws IOException {
233    initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, job,
234      addDependencyJars, getConfiguredInputFormat(job));
235  }
236
237  /**
238   * @return {@link TableInputFormat} .class unless Configuration has something else at
239   *         {@link #TABLE_INPUT_CLASS_KEY}.
240   */
241  private static Class<? extends InputFormat> getConfiguredInputFormat(Job job) {
242    return (Class<? extends InputFormat>) job.getConfiguration().getClass(TABLE_INPUT_CLASS_KEY,
243      TableInputFormat.class);
244  }
245
246  /**
247   * Use this before submitting a TableMap job. It will appropriately set up the job.
248   * @param table             The table name to read from.
249   * @param scan              The scan instance with the columns, time range etc.
250   * @param mapper            The mapper class to use.
251   * @param outputKeyClass    The class of the output key.
252   * @param outputValueClass  The class of the output value.
253   * @param job               The current job to adjust. Make sure the passed job is carrying all
254   *                          necessary HBase configuration.
255   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
256   *                          the distributed cache (tmpjars).
257   * @throws IOException When setting up the details fails.
258   */
259  public static void initTableMapperJob(String table, Scan scan,
260    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
261    Job job, boolean addDependencyJars) throws IOException {
262    initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job,
263      addDependencyJars, getConfiguredInputFormat(job));
264  }
265
266  /**
267   * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on direct
268   * memory will likely cause the map tasks to OOM when opening the region. This is done here
269   * instead of in TableSnapshotRegionRecordReader in case an advanced user wants to override this
270   * behavior in their job.
271   */
272  public static void resetCacheConfig(Configuration conf) {
273    conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
274    conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f);
275    conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY);
276  }
277
278  /**
279   * Sets up the job for reading from one or more table snapshots, with one or more scans per
280   * snapshot. It bypasses hbase servers and read directly from snapshot files.
281   * @param snapshotScans     map of snapshot name to scans on that snapshot.
282   * @param mapper            The mapper class to use.
283   * @param outputKeyClass    The class of the output key.
284   * @param outputValueClass  The class of the output value.
285   * @param job               The current job to adjust. Make sure the passed job is carrying all
286   *                          necessary HBase configuration.
287   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
288   *                          the distributed cache (tmpjars).
289   */
290  public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
291    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
292    Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
293    MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir);
294
295    job.setInputFormatClass(MultiTableSnapshotInputFormat.class);
296    if (outputValueClass != null) {
297      job.setMapOutputValueClass(outputValueClass);
298    }
299    if (outputKeyClass != null) {
300      job.setMapOutputKeyClass(outputKeyClass);
301    }
302    job.setMapperClass(mapper);
303    Configuration conf = job.getConfiguration();
304    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
305
306    if (addDependencyJars) {
307      addDependencyJars(job);
308      addDependencyJarsForClasses(job.getConfiguration(), MetricRegistry.class);
309    }
310
311    resetCacheConfig(job.getConfiguration());
312  }
313
314  /**
315   * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly
316   * from snapshot files.
317   * @param snapshotName      The name of the snapshot (of a table) to read from.
318   * @param scan              The scan instance with the columns, time range etc.
319   * @param mapper            The mapper class to use.
320   * @param outputKeyClass    The class of the output key.
321   * @param outputValueClass  The class of the output value.
322   * @param job               The current job to adjust. Make sure the passed job is carrying all
323   *                          necessary HBase configuration.
324   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
325   *                          the distributed cache (tmpjars).
326   * @param tmpRestoreDir     a temporary directory to copy the snapshot files into. Current user
327   *                          should have write permissions to this directory, and this should not
328   *                          be a subdirectory of rootdir. After the job is finished, restore
329   *                          directory can be deleted.
330   * @throws IOException When setting up the details fails.
331   * @see TableSnapshotInputFormat
332   */
333  public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
334    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
335    Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
336    TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
337    initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, outputValueClass, job,
338      addDependencyJars, false, TableSnapshotInputFormat.class);
339    resetCacheConfig(job.getConfiguration());
340  }
341
342  /**
343   * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly
344   * from snapshot files.
345   * @param snapshotName       The name of the snapshot (of a table) to read from.
346   * @param scan               The scan instance with the columns, time range etc.
347   * @param mapper             The mapper class to use.
348   * @param outputKeyClass     The class of the output key.
349   * @param outputValueClass   The class of the output value.
350   * @param job                The current job to adjust. Make sure the passed job is carrying all
351   *                           necessary HBase configuration.
352   * @param addDependencyJars  upload HBase jars and jars for any of the configured job classes via
353   *                           the distributed cache (tmpjars).
354   * @param tmpRestoreDir      a temporary directory to copy the snapshot files into. Current user
355   *                           should have write permissions to this directory, and this should not
356   *                           be a subdirectory of rootdir. After the job is finished, restore
357   *                           directory can be deleted.
358   * @param splitAlgo          algorithm to split
359   * @param numSplitsPerRegion how many input splits to generate per one region
360   * @throws IOException When setting up the details fails.
361   * @see TableSnapshotInputFormat
362   */
363  public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
364    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
365    Job job, boolean addDependencyJars, Path tmpRestoreDir, RegionSplitter.SplitAlgorithm splitAlgo,
366    int numSplitsPerRegion) throws IOException {
367    TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir, splitAlgo,
368      numSplitsPerRegion);
369    initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, outputValueClass, job,
370      addDependencyJars, false, TableSnapshotInputFormat.class);
371    resetCacheConfig(job.getConfiguration());
372  }
373
374  /**
375   * Use this before submitting a Multi TableMap job. It will appropriately set up the job.
376   * @param scans            The list of {@link Scan} objects to read from.
377   * @param mapper           The mapper class to use.
378   * @param outputKeyClass   The class of the output key.
379   * @param outputValueClass The class of the output value.
380   * @param job              The current job to adjust. Make sure the passed job is carrying all
381   *                         necessary HBase configuration.
382   * @throws IOException When setting up the details fails.
383   */
384  public static void initTableMapperJob(List<Scan> scans, Class<? extends TableMapper> mapper,
385    Class<?> outputKeyClass, Class<?> outputValueClass, Job job) throws IOException {
386    initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job, true);
387  }
388
389  /**
390   * Use this before submitting a Multi TableMap job. It will appropriately set up the job.
391   * @param scans             The list of {@link Scan} objects to read from.
392   * @param mapper            The mapper class to use.
393   * @param outputKeyClass    The class of the output key.
394   * @param outputValueClass  The class of the output value.
395   * @param job               The current job to adjust. Make sure the passed job is carrying all
396   *                          necessary HBase configuration.
397   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
398   *                          the distributed cache (tmpjars).
399   * @throws IOException When setting up the details fails.
400   */
401  public static void initTableMapperJob(List<Scan> scans, Class<? extends TableMapper> mapper,
402    Class<?> outputKeyClass, Class<?> outputValueClass, Job job, boolean addDependencyJars)
403    throws IOException {
404    initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job, addDependencyJars,
405      true);
406  }
407
408  /**
409   * Use this before submitting a Multi TableMap job. It will appropriately set up the job.
410   * @param scans             The list of {@link Scan} objects to read from.
411   * @param mapper            The mapper class to use.
412   * @param outputKeyClass    The class of the output key.
413   * @param outputValueClass  The class of the output value.
414   * @param job               The current job to adjust. Make sure the passed job is carrying all
415   *                          necessary HBase configuration.
416   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
417   *                          the distributed cache (tmpjars).
418   * @param initCredentials   whether to initialize hbase auth credentials for the job
419   * @throws IOException When setting up the details fails.
420   */
421  public static void initTableMapperJob(List<Scan> scans, Class<? extends TableMapper> mapper,
422    Class<?> outputKeyClass, Class<?> outputValueClass, Job job, boolean addDependencyJars,
423    boolean initCredentials) throws IOException {
424    job.setInputFormatClass(MultiTableInputFormat.class);
425    if (outputValueClass != null) {
426      job.setMapOutputValueClass(outputValueClass);
427    }
428    if (outputKeyClass != null) {
429      job.setMapOutputKeyClass(outputKeyClass);
430    }
431    job.setMapperClass(mapper);
432    Configuration conf = job.getConfiguration();
433    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
434    List<String> scanStrings = new ArrayList<>();
435
436    for (Scan scan : scans) {
437      scanStrings.add(convertScanToString(scan));
438    }
439    job.getConfiguration().setStrings(MultiTableInputFormat.SCANS,
440      scanStrings.toArray(new String[scanStrings.size()]));
441
442    if (addDependencyJars) {
443      addDependencyJars(job);
444    }
445
446    if (initCredentials) {
447      initCredentials(job);
448    }
449  }
450
451  private static void addTokenForJob(IOExceptionSupplier<Connection> connSupplier, User user,
452    Job job) throws IOException, InterruptedException {
453    try (Connection conn = connSupplier.get()) {
454      TokenUtil.addTokenForJob(conn, user, job);
455    }
456  }
457
458  public static void initCredentials(Job job) throws IOException {
459    UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
460    if (userProvider.isHadoopSecurityEnabled()) {
461      // propagate delegation related props from launcher job to MR job
462      if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
463        job.getConfiguration().set("mapreduce.job.credentials.binary",
464          System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
465      }
466    }
467
468    if (userProvider.isHBaseSecurityEnabled()) {
469      User user = userProvider.getCurrent();
470      try {
471        // init credentials for remote cluster
472        String outputCluster = job.getConfiguration().get(TableOutputFormat.OUTPUT_CLUSTER);
473        if (!StringUtils.isBlank(outputCluster)) {
474          addTokenForJob(() -> {
475            URI uri;
476            try {
477              uri = new URI(outputCluster);
478            } catch (URISyntaxException e) {
479              throw new IOException("malformed connection uri: " + outputCluster
480                + ", please check config " + TableOutputFormat.OUTPUT_CLUSTER, e);
481            }
482            return ConnectionFactory.createConnection(uri, job.getConfiguration());
483          }, user, job);
484        }
485        String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
486        if (!StringUtils.isBlank(quorumAddress)) {
487          addTokenForJob(() -> {
488            Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
489              quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX);
490            return ConnectionFactory.createConnection(peerConf, user);
491          }, user, job);
492        }
493        // init credentials for source cluster
494        addTokenForJob(() -> ConnectionFactory.createConnection(job.getConfiguration()), user, job);
495      } catch (InterruptedException ie) {
496        LOG.info("Interrupted obtaining user authentication token");
497        Thread.currentThread().interrupt();
498      }
499    }
500  }
501
502  /**
503   * Obtain an authentication token, for the specified cluster, on behalf of the current user and
504   * add it to the credentials for the given map reduce job.
505   * @param job  The job that requires the permission.
506   * @param conf The configuration to use in connecting to the peer cluster
507   * @throws IOException When the authentication token cannot be obtained.
508   */
509  public static void initCredentialsForCluster(Job job, Configuration conf) throws IOException {
510    initCredentialsForCluster(job, conf, null);
511  }
512
513  /**
514   * Obtain an authentication token, for the specified cluster, on behalf of the current user and
515   * add it to the credentials for the given map reduce job.
516   * @param job  The job that requires the permission.
517   * @param conf The configuration to use in connecting to the peer cluster
518   * @param uri  The connection uri for the given peer cluster
519   * @throws IOException When the authentication token cannot be obtained.
520   */
521  public static void initCredentialsForCluster(Job job, Configuration conf, URI uri)
522    throws IOException {
523    UserProvider userProvider = UserProvider.instantiate(conf);
524    if (userProvider.isHBaseSecurityEnabled()) {
525      try {
526        addTokenForJob(() -> ConnectionFactory.createConnection(uri, conf),
527          userProvider.getCurrent(), job);
528      } catch (InterruptedException e) {
529        LOG.info("Interrupted obtaining user authentication token");
530        Thread.interrupted();
531      }
532    }
533  }
534
535  /**
536   * Writes the given scan into a Base64 encoded string.
537   * @param scan The scan to write out.
538   * @return The scan saved in a Base64 encoded string.
539   * @throws IOException When writing the scan fails.
540   */
541  public static String convertScanToString(Scan scan) throws IOException {
542    ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
543    return Bytes.toString(Base64.getEncoder().encode(proto.toByteArray()));
544  }
545
546  /**
547   * Converts the given Base64 string back into a Scan instance.
548   * @param base64 The scan details.
549   * @return The newly created Scan instance.
550   * @throws IOException When reading the scan instance fails.
551   */
552  public static Scan convertStringToScan(String base64) throws IOException {
553    byte[] decoded = Base64.getDecoder().decode(base64);
554    return ProtobufUtil.toScan(ClientProtos.Scan.parseFrom(decoded));
555  }
556
557  /**
558   * Use this before submitting a TableReduce job. It will appropriately set up the JobConf.
559   * @param table   The output table.
560   * @param reducer The reducer class to use.
561   * @param job     The current job to adjust.
562   * @throws IOException When determining the region count fails.
563   */
564  public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer,
565    Job job) throws IOException {
566    initTableReducerJob(table, reducer, job, null);
567  }
568
569  /**
570   * Use this before submitting a TableReduce job. It will appropriately set up the JobConf.
571   * @param table       The output table.
572   * @param reducer     The reducer class to use.
573   * @param job         The current job to adjust.
574   * @param partitioner Partitioner to use. Pass <code>null</code> to use default partitioner.
575   * @throws IOException When determining the region count fails.
576   */
577  public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer,
578    Job job, Class partitioner) throws IOException {
579    initTableReducerJob(table, reducer, job, partitioner, (URI) null);
580  }
581
582  /**
583   * Use this before submitting a TableReduce job. It will appropriately set up the JobConf.
584   * @param table         The output table.
585   * @param reducer       The reducer class to use.
586   * @param job           The current job to adjust. Make sure the passed job is carrying all
587   *                      necessary HBase configuration.
588   * @param partitioner   Partitioner to use. Pass <code>null</code> to use default partitioner.
589   * @param quorumAddress Distant cluster to write to; default is null for output to the cluster
590   *                      that is designated in <code>hbase-site.xml</code>. Set this String to the
591   *                      zookeeper ensemble of an alternate remote cluster when you would have the
592   *                      reduce write a cluster that is other than the default; e.g. copying tables
593   *                      between clusters, the source would be designated by
594   *                      <code>hbase-site.xml</code> and this param would have the ensemble address
595   *                      of the remote cluster. The format to pass is particular. Pass
596   *                      <code> &lt;hbase.zookeeper.quorum&gt;:&lt;
597   *             hbase.zookeeper.client.port&gt;:&lt;zookeeper.znode.parent&gt;
598   * </code>           such as <code>server,server2,server3:2181:/hbase</code>.
599   * @throws IOException When determining the region count fails.
600   * @deprecated Since 3.0.0, will be removed in 4.0.0. Use
601   *             {@link #initTableReducerJob(String, Class, Job, Class, URI)} instead, where we use
602   *             the connection uri to specify the target cluster.
603   */
604  @Deprecated
605  public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer,
606    Job job, Class partitioner, String quorumAddress) throws IOException {
607    initTableReducerJob(table, reducer, job, partitioner, quorumAddress, true);
608  }
609
610  /**
611   * Use this before submitting a TableReduce job. It will appropriately set up the JobConf.
612   * @param table             The output table.
613   * @param reducer           The reducer class to use.
614   * @param job               The current job to adjust. Make sure the passed job is carrying all
615   *                          necessary HBase configuration.
616   * @param partitioner       Partitioner to use. Pass <code>null</code> to use default partitioner.
617   * @param quorumAddress     Distant cluster to write to; default is null for output to the cluster
618   *                          that is designated in <code>hbase-site.xml</code>. Set this String to
619   *                          the zookeeper ensemble of an alternate remote cluster when you would
620   *                          have the reduce write a cluster that is other than the default; e.g.
621   *                          copying tables between clusters, the source would be designated by
622   *                          <code>hbase-site.xml</code> and this param would have the ensemble
623   *                          address of the remote cluster. The format to pass is particular. Pass
624   *                          <code> &lt;hbase.zookeeper.quorum&gt;:&lt;
625   *             hbase.zookeeper.client.port&gt;:&lt;zookeeper.znode.parent&gt;
626   * </code>               such as <code>server,server2,server3:2181:/hbase</code>.
627   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
628   *                          the distributed cache (tmpjars).
629   * @throws IOException When determining the region count fails.
630   * @deprecated Since 3.0.0, will be removed in 4.0.0. Use
631   *             {@link #initTableReducerJob(String, Class, Job, Class, URI, boolean)} instead,
632   *             where we use the connection uri to specify the target cluster.
633   */
634  @Deprecated
635  public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer,
636    Job job, Class partitioner, String quorumAddress, boolean addDependencyJars)
637    throws IOException {
638    initTableReducerJob(table, reducer, job, partitioner, () -> {
639      // If passed a quorum/ensemble address, pass it on to TableOutputFormat.
640      if (quorumAddress != null) {
641        // Calling this will validate the format
642        ZKConfig.validateClusterKey(quorumAddress);
643        job.getConfiguration().set(TableOutputFormat.QUORUM_ADDRESS, quorumAddress);
644      }
645    }, addDependencyJars);
646  }
647
648  /**
649   * Use this before submitting a TableReduce job. It will appropriately set up the JobConf.
650   * @param table         The output table.
651   * @param reducer       The reducer class to use.
652   * @param job           The current job to adjust. Make sure the passed job is carrying all
653   *                      necessary HBase configuration.
654   * @param partitioner   Partitioner to use. Pass <code>null</code> to use default partitioner.
655   * @param outputCluster The HBase cluster you want to write to. Default is null which means output
656   *                      to the same cluster you read from, i.e, the cluster when initializing by
657   *                      the job's Configuration instance.
658   * @throws IOException When determining the region count fails.
659   */
660  public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer,
661    Job job, Class partitioner, URI outputCluster) throws IOException {
662    initTableReducerJob(table, reducer, job, partitioner, outputCluster, true);
663  }
664
665  /**
666   * Use this before submitting a TableReduce job. It will appropriately set up the JobConf.
667   * @param table             The output table.
668   * @param reducer           The reducer class to use.
669   * @param job               The current job to adjust. Make sure the passed job is carrying all
670   *                          necessary HBase configuration.
671   * @param partitioner       Partitioner to use. Pass <code>null</code> to use default partitioner.
672   * @param outputCluster     The HBase cluster you want to write to. Default is null which means
673   *                          output to the same cluster you read from, i.e, the cluster when
674   *                          initializing by the job's Configuration instance.
675   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
676   *                          the distributed cache (tmpjars).
677   * @throws IOException When determining the region count fails.
678   */
679  public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer,
680    Job job, Class partitioner, URI outputCluster, boolean addDependencyJars) throws IOException {
681    initTableReducerJob(table, reducer, job, partitioner, () -> {
682      if (outputCluster != null) {
683        ConnectionRegistryFactory.validate(outputCluster);
684        job.getConfiguration().set(TableOutputFormat.OUTPUT_CLUSTER, outputCluster.toString());
685      }
686    }, addDependencyJars);
687  }
688
689  private static void initTableReducerJob(String table, Class<? extends TableReducer> reducer,
690    Job job, Class partitioner, IOExceptionRunnable setOutputCluster, boolean addDependencyJars)
691    throws IOException {
692    Configuration conf = job.getConfiguration();
693    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
694    job.setOutputFormatClass(TableOutputFormat.class);
695    if (reducer != null) {
696      job.setReducerClass(reducer);
697    }
698    conf.set(TableOutputFormat.OUTPUT_TABLE, table);
699    conf.setStrings("io.serializations", conf.get("io.serializations"),
700      MutationSerialization.class.getName(), ResultSerialization.class.getName());
701    setOutputCluster.run();
702    job.setOutputKeyClass(ImmutableBytesWritable.class);
703    job.setOutputValueClass(Writable.class);
704    if (partitioner == HRegionPartitioner.class) {
705      job.setPartitionerClass(HRegionPartitioner.class);
706      int regions = getRegionCount(conf, TableName.valueOf(table));
707      if (job.getNumReduceTasks() > regions) {
708        job.setNumReduceTasks(regions);
709      }
710    } else if (partitioner != null) {
711      job.setPartitionerClass(partitioner);
712    }
713
714    if (addDependencyJars) {
715      addDependencyJars(job);
716    }
717
718    initCredentials(job);
719  }
720
721  /**
722   * Use this before submitting a TableReduce job. It will appropriately set up the JobConf.
723   * @param table         The output table.
724   * @param reducer       The reducer class to use.
725   * @param job           The current job to adjust. Make sure the passed job is carrying all
726   *                      necessary HBase configuration.
727   * @param partitioner   Partitioner to use. Pass <code>null</code> to use default partitioner.
728   * @param quorumAddress Distant cluster to write to; default is null for output to the cluster
729   *                      that is designated in <code>hbase-site.xml</code>. Set this String to the
730   *                      zookeeper ensemble of an alternate remote cluster when you would have the
731   *                      reduce write a cluster that is other than the default; e.g. copying tables
732   *                      between clusters, the source would be designated by
733   *                      <code>hbase-site.xml</code> and this param would have the ensemble address
734   *                      of the remote cluster. The format to pass is particular. Pass
735   *                      <code> &lt;hbase.zookeeper.quorum&gt;:&lt;
736   *             hbase.zookeeper.client.port&gt;:&lt;zookeeper.znode.parent&gt;
737   * </code>           such as <code>server,server2,server3:2181:/hbase</code>.
738   * @param serverClass   redefined hbase.regionserver.class
739   * @param serverImpl    redefined hbase.regionserver.impl
740   * @throws IOException When determining the region count fails.
741   * @deprecated Since 2.5.9, 2.6.1, 2.7.0, will be removed in 4.0.0. The {@code serverClass} and
742   *             {@code serverImpl} do not take effect any more, just use
743   *             {@link #initTableReducerJob(String, Class, Job, Class, String)} instead.
744   * @see #initTableReducerJob(String, Class, Job, Class, String)
745   */
746  @Deprecated
747  public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer,
748    Job job, Class partitioner, String quorumAddress, String serverClass, String serverImpl)
749    throws IOException {
750    initTableReducerJob(table, reducer, job, partitioner, quorumAddress);
751  }
752
753  /**
754   * Use this before submitting a TableReduce job. It will appropriately set up the JobConf.
755   * @param table             The output table.
756   * @param reducer           The reducer class to use.
757   * @param job               The current job to adjust. Make sure the passed job is carrying all
758   *                          necessary HBase configuration.
759   * @param partitioner       Partitioner to use. Pass <code>null</code> to use default partitioner.
760   * @param quorumAddress     Distant cluster to write to; default is null for output to the cluster
761   *                          that is designated in <code>hbase-site.xml</code>. Set this String to
762   *                          the zookeeper ensemble of an alternate remote cluster when you would
763   *                          have the reduce write a cluster that is other than the default; e.g.
764   *                          copying tables between clusters, the source would be designated by
765   *                          <code>hbase-site.xml</code> and this param would have the ensemble
766   *                          address of the remote cluster. The format to pass is particular. Pass
767   *                          <code> &lt;hbase.zookeeper.quorum&gt;:&lt;
768   *             hbase.zookeeper.client.port&gt;:&lt;zookeeper.znode.parent&gt;
769   * </code>               such as <code>server,server2,server3:2181:/hbase</code>.
770   * @param serverClass       redefined hbase.regionserver.class
771   * @param serverImpl        redefined hbase.regionserver.impl
772   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
773   *                          the distributed cache (tmpjars).
774   * @throws IOException When determining the region count fails.
775   * @deprecated Since 2.5.9, 2.6.1, 2.7.0, will be removed in 4.0.0. The {@code serverClass} and
776   *             {@code serverImpl} do not take effect any more, just use
777   *             {@link #initTableReducerJob(String, Class, Job, Class, String, boolean)} instead.
778   * @see #initTableReducerJob(String, Class, Job, Class, String, boolean)
779   */
780  @Deprecated
781  public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer,
782    Job job, Class partitioner, String quorumAddress, String serverClass, String serverImpl,
783    boolean addDependencyJars) throws IOException {
784    initTableReducerJob(table, reducer, job, partitioner, quorumAddress, addDependencyJars);
785  }
786
787  /**
788   * Ensures that the given number of reduce tasks for the given job configuration does not exceed
789   * the number of regions for the given table.
790   * @param table The table to get the region count for.
791   * @param job   The current job to adjust.
792   * @throws IOException When retrieving the table details fails.
793   */
794  public static void limitNumReduceTasks(String table, Job job) throws IOException {
795    int regions = getRegionCount(job.getConfiguration(), TableName.valueOf(table));
796    if (job.getNumReduceTasks() > regions) {
797      job.setNumReduceTasks(regions);
798    }
799  }
800
801  /**
802   * Sets the number of reduce tasks for the given job configuration to the number of regions the
803   * given table has.
804   * @param table The table to get the region count for.
805   * @param job   The current job to adjust.
806   * @throws IOException When retrieving the table details fails.
807   */
808  public static void setNumReduceTasks(String table, Job job) throws IOException {
809    job.setNumReduceTasks(getRegionCount(job.getConfiguration(), TableName.valueOf(table)));
810  }
811
812  /**
813   * Sets the number of rows to return and cache with each scanner iteration. Higher caching values
814   * will enable faster mapreduce jobs at the expense of requiring more heap to contain the cached
815   * rows.
816   * @param job       The current job to adjust.
817   * @param batchSize The number of rows to return in batch with each scanner iteration.
818   */
819  public static void setScannerCaching(Job job, int batchSize) {
820    job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize);
821  }
822
823  /**
824   * Add HBase and its dependencies (only) to the job configuration.
825   * <p>
826   * This is intended as a low-level API, facilitating code reuse between this class and its mapred
827   * counterpart. It also of use to external tools that need to build a MapReduce job that interacts
828   * with HBase but want fine-grained control over the jars shipped to the cluster.
829   * </p>
830   * @param conf The Configuration object to extend with dependencies.
831   * @see org.apache.hadoop.hbase.mapred.TableMapReduceUtil
832   * @see <a href="https://issues.apache.org/jira/browse/PIG-3285">PIG-3285</a>
833   */
834  public static void addHBaseDependencyJars(Configuration conf) throws IOException {
835    addDependencyJarsForClasses(conf,
836      // explicitly pull a class from each module
837      org.apache.hadoop.hbase.HConstants.class, // hbase-common
838      org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.class, // hbase-protocol-shaded
839      org.apache.hadoop.hbase.client.Put.class, // hbase-client
840      org.apache.hadoop.hbase.ipc.RpcServer.class, // hbase-server
841      org.apache.hadoop.hbase.CompatibilityFactory.class, // hbase-hadoop-compat
842      org.apache.hadoop.hbase.mapreduce.JobUtil.class, // hbase-hadoop2-compat
843      org.apache.hadoop.hbase.mapreduce.TableMapper.class, // hbase-mapreduce
844      org.apache.hadoop.hbase.metrics.impl.FastLongHistogram.class, // hbase-metrics
845      org.apache.hadoop.hbase.metrics.Snapshot.class, // hbase-metrics-api
846      org.apache.hadoop.hbase.replication.ReplicationUtils.class, // hbase-replication
847      org.apache.hadoop.hbase.http.HttpServer.class, // hbase-http
848      org.apache.hadoop.hbase.procedure2.Procedure.class, // hbase-procedure
849      org.apache.hadoop.hbase.zookeeper.ZKWatcher.class, // hbase-zookeeper
850      org.apache.hbase.thirdparty.com.google.common.collect.Lists.class, // hb-shaded-miscellaneous
851      org.apache.hbase.thirdparty.com.google.gson.GsonBuilder.class, // hbase-shaded-gson
852      org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations.class, // hb-sh-protobuf
853      org.apache.hbase.thirdparty.io.netty.channel.Channel.class, // hbase-shaded-netty
854      org.apache.hadoop.hbase.unsafe.HBasePlatformDependent.class, // hbase-unsafe
855      org.apache.zookeeper.ZooKeeper.class, // zookeeper
856      com.codahale.metrics.MetricRegistry.class, // metrics-core
857      org.apache.commons.lang3.ArrayUtils.class, // commons-lang
858      io.opentelemetry.api.trace.Span.class, // opentelemetry-api
859      io.opentelemetry.semconv.trace.attributes.SemanticAttributes.class, // opentelemetry-semconv
860      io.opentelemetry.context.Context.class); // opentelemetry-context
861  }
862
863  /**
864   * Returns a classpath string built from the content of the "tmpjars" value in {@code conf}. Also
865   * exposed to shell scripts via `bin/hbase mapredcp`.
866   */
867  public static String buildDependencyClasspath(Configuration conf) {
868    if (conf == null) {
869      throw new IllegalArgumentException("Must provide a configuration object.");
870    }
871    Set<String> paths = new HashSet<>(conf.getStringCollection("tmpjars"));
872    if (paths.isEmpty()) {
873      throw new IllegalArgumentException("Configuration contains no tmpjars.");
874    }
875    StringBuilder sb = new StringBuilder();
876    for (String s : paths) {
877      // entries can take the form 'file:/path/to/file.jar'.
878      int idx = s.indexOf(":");
879      if (idx != -1) s = s.substring(idx + 1);
880      if (sb.length() > 0) sb.append(File.pathSeparator);
881      sb.append(s);
882    }
883    return sb.toString();
884  }
885
886  /**
887   * Add the HBase dependency jars as well as jars for any of the configured job classes to the job
888   * configuration, so that JobClient will ship them to the cluster and add them to the
889   * DistributedCache.
890   */
891  public static void addDependencyJars(Job job) throws IOException {
892    addHBaseDependencyJars(job.getConfiguration());
893    try {
894      addDependencyJarsForClasses(job.getConfiguration(),
895        // when making changes here, consider also mapred.TableMapReduceUtil
896        // pull job classes
897        job.getMapOutputKeyClass(), job.getMapOutputValueClass(), job.getInputFormatClass(),
898        job.getOutputKeyClass(), job.getOutputValueClass(), job.getOutputFormatClass(),
899        job.getPartitionerClass(), job.getCombinerClass());
900    } catch (ClassNotFoundException e) {
901      throw new IOException(e);
902    }
903  }
904
905  /**
906   * Add the jars containing the given classes to the job's configuration such that JobClient will
907   * ship them to the cluster and add them to the DistributedCache. N.B. that this method at most
908   * adds one jar per class given. If there is more than one jar available containing a class with
909   * the same name as a given class, we don't define which of those jars might be chosen.
910   * @param conf    The Hadoop Configuration to modify
911   * @param classes will add just those dependencies needed to find the given classes
912   * @throws IOException if an underlying library call fails.
913   */
914  @InterfaceAudience.Private
915  public static void addDependencyJarsForClasses(Configuration conf, Class<?>... classes)
916    throws IOException {
917
918    FileSystem localFs = FileSystem.getLocal(conf);
919    Set<String> jars = new HashSet<>();
920    // Add jars that are already in the tmpjars variable
921    jars.addAll(conf.getStringCollection("tmpjars"));
922
923    // add jars as we find them to a map of contents jar name so that we can avoid
924    // creating new jars for classes that have already been packaged.
925    Map<String, String> packagedClasses = new HashMap<>();
926
927    // Add jars containing the specified classes
928    for (Class<?> clazz : classes) {
929      if (clazz == null) continue;
930
931      Path path = findOrCreateJar(clazz, localFs, packagedClasses);
932      if (path == null) {
933        LOG.warn("Could not find jar for class " + clazz + " in order to ship it to the cluster.");
934        continue;
935      }
936      if (!localFs.exists(path)) {
937        LOG.warn("Could not validate jar file " + path + " for class " + clazz);
938        continue;
939      }
940      jars.add(path.toString());
941    }
942    if (jars.isEmpty()) {
943      return;
944    }
945    conf.set("tmpjars", jars.stream().collect(Collectors.joining(",")));
946  }
947
948  /**
949   * Finds the Jar for a class or creates it if it doesn't exist. If the class is in a directory in
950   * the classpath, it creates a Jar on the fly with the contents of the directory and returns the
951   * path to that Jar. If a Jar is created, it is created in the system temporary directory.
952   * Otherwise, returns an existing jar that contains a class of the same name. Maintains a mapping
953   * from jar contents to the tmp jar created.
954   * @param my_class        the class to find.
955   * @param fs              the FileSystem with which to qualify the returned path.
956   * @param packagedClasses a map of class name to path.
957   * @return a jar file that contains the class.
958   */
959  private static Path findOrCreateJar(Class<?> my_class, FileSystem fs,
960    Map<String, String> packagedClasses) throws IOException {
961    // attempt to locate an existing jar for the class.
962    String jar = findContainingJar(my_class, packagedClasses);
963    if (null == jar || jar.isEmpty()) {
964      jar = getJar(my_class);
965      updateMap(jar, packagedClasses);
966    }
967
968    if (null == jar || jar.isEmpty()) {
969      return null;
970    }
971
972    LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar));
973    return new Path(jar).makeQualified(fs.getUri(), fs.getWorkingDirectory());
974  }
975
976  /**
977   * Add entries to <code>packagedClasses</code> corresponding to class files contained in
978   * <code>jar</code>.
979   * @param jar             The jar who's content to list.
980   * @param packagedClasses map[class -> jar]
981   */
982  private static void updateMap(String jar, Map<String, String> packagedClasses)
983    throws IOException {
984    if (null == jar || jar.isEmpty()) {
985      return;
986    }
987    ZipFile zip = null;
988    try {
989      zip = new ZipFile(jar);
990      for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) {
991        ZipEntry entry = iter.nextElement();
992        if (entry.getName().endsWith("class")) {
993          packagedClasses.put(entry.getName(), jar);
994        }
995      }
996    } finally {
997      if (null != zip) zip.close();
998    }
999  }
1000
1001  /**
1002   * Find a jar that contains a class of the same name, if any. It will return a jar file, even if
1003   * that is not the first thing on the class path that has a class with the same name. Looks first
1004   * on the classpath and then in the <code>packagedClasses</code> map.
1005   * @param my_class the class to find.
1006   * @return a jar file that contains the class, or null.
1007   */
1008  private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses)
1009    throws IOException {
1010    ClassLoader loader = my_class.getClassLoader();
1011
1012    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
1013
1014    if (loader != null) {
1015      // first search the classpath
1016      for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
1017        URL url = itr.nextElement();
1018        if ("jar".equals(url.getProtocol())) {
1019          String toReturn = url.getPath();
1020          if (toReturn.startsWith("file:")) {
1021            toReturn = toReturn.substring("file:".length());
1022          }
1023          // URLDecoder is a misnamed class, since it actually decodes
1024          // x-www-form-urlencoded MIME type rather than actual
1025          // URL encoding (which the file path has). Therefore it would
1026          // decode +s to ' 's which is incorrect (spaces are actually
1027          // either unencoded or encoded as "%20"). Replace +s first, so
1028          // that they are kept sacred during the decoding process.
1029          toReturn = toReturn.replaceAll("\\+", "%2B");
1030          toReturn = URLDecoder.decode(toReturn, "UTF-8");
1031          return toReturn.replaceAll("!.*$", "");
1032        }
1033      }
1034    }
1035
1036    // now look in any jars we've packaged using JarFinder. Returns null when
1037    // no jar is found.
1038    return packagedClasses.get(class_file);
1039  }
1040
1041  /**
1042   * Invoke 'getJar' on a custom JarFinder implementation. Useful for some job configuration
1043   * contexts (HBASE-8140) and also for testing on MRv2. check if we have HADOOP-9426.
1044   * @param my_class the class to find.
1045   * @return a jar file that contains the class, or null.
1046   */
1047  private static String getJar(Class<?> my_class) {
1048    String ret = null;
1049    try {
1050      ret = JarFinder.getJar(my_class);
1051    } catch (Exception e) {
1052      // toss all other exceptions, related to reflection failure
1053      throw new RuntimeException("getJar invocation failed.", e);
1054    }
1055
1056    return ret;
1057  }
1058
1059  private static int getRegionCount(Configuration conf, TableName tableName) throws IOException {
1060    try (Connection conn = ConnectionFactory.createConnection(conf);
1061      RegionLocator locator = conn.getRegionLocator(tableName)) {
1062      return locator.getAllRegionLocations().size();
1063    }
1064  }
1065}