001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import com.codahale.metrics.MetricRegistry;
021import java.io.File;
022import java.io.IOException;
023import java.net.URL;
024import java.net.URLDecoder;
025import java.util.ArrayList;
026import java.util.Base64;
027import java.util.Collection;
028import java.util.Enumeration;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034import java.util.zip.ZipEntry;
035import java.util.zip.ZipFile;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.ConnectionFactory;
044import org.apache.hadoop.hbase.client.Put;
045import org.apache.hadoop.hbase.client.RegionLocator;
046import org.apache.hadoop.hbase.client.Scan;
047import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
048import org.apache.hadoop.hbase.security.User;
049import org.apache.hadoop.hbase.security.UserProvider;
050import org.apache.hadoop.hbase.security.token.TokenUtil;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.RegionSplitter;
053import org.apache.hadoop.hbase.zookeeper.ZKConfig;
054import org.apache.hadoop.io.Writable;
055import org.apache.hadoop.mapreduce.InputFormat;
056import org.apache.hadoop.mapreduce.Job;
057import org.apache.hadoop.util.StringUtils;
058import org.apache.yetus.audience.InterfaceAudience;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
064
065/**
066 * Utility for {@link TableMapper} and {@link TableReducer}
067 */
068@SuppressWarnings({ "rawtypes", "unchecked" })
069@InterfaceAudience.Public
070public class TableMapReduceUtil {
071  private static final Logger LOG = LoggerFactory.getLogger(TableMapReduceUtil.class);
072  public static final String TABLE_INPUT_CLASS_KEY = "hbase.table.input.class";
073
074  /**
075   * Use this before submitting a TableMap job. It will appropriately set up the job.
076   * @param table            The table name to read from.
077   * @param scan             The scan instance with the columns, time range etc.
078   * @param mapper           The mapper class to use.
079   * @param outputKeyClass   The class of the output key.
080   * @param outputValueClass The class of the output value.
081   * @param job              The current job to adjust. Make sure the passed job is carrying all
082   *                         necessary HBase configuration.
083   * @throws IOException When setting up the details fails.
084   */
085  public static void initTableMapperJob(String table, Scan scan,
086    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
087    Job job) throws IOException {
088    initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job, true);
089  }
090
091  /**
092   * Use this before submitting a TableMap job. It will appropriately set up the job.
093   * @param table            The table name to read from.
094   * @param scan             The scan instance with the columns, time range etc.
095   * @param mapper           The mapper class to use.
096   * @param outputKeyClass   The class of the output key.
097   * @param outputValueClass The class of the output value.
098   * @param job              The current job to adjust. Make sure the passed job is carrying all
099   *                         necessary HBase configuration.
100   * @throws IOException When setting up the details fails.
101   */
102  public static void initTableMapperJob(TableName table, Scan scan,
103    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
104    Job job) throws IOException {
105    initTableMapperJob(table.getNameAsString(), scan, mapper, outputKeyClass, outputValueClass, job,
106      true);
107  }
108
109  /**
110   * Use this before submitting a TableMap job. It will appropriately set up the job.
111   * @param table            Binary representation of the table name to read from.
112   * @param scan             The scan instance with the columns, time range etc.
113   * @param mapper           The mapper class to use.
114   * @param outputKeyClass   The class of the output key.
115   * @param outputValueClass The class of the output value.
116   * @param job              The current job to adjust. Make sure the passed job is carrying all
117   *                         necessary HBase configuration.
118   * @throws IOException When setting up the details fails.
119   */
120  public static void initTableMapperJob(byte[] table, Scan scan,
121    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
122    Job job) throws IOException {
123    initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, job,
124      true);
125  }
126
127  /**
128   * Use this before submitting a TableMap job. It will appropriately set up the job.
129   * @param table             The table name to read from.
130   * @param scan              The scan instance with the columns, time range etc.
131   * @param mapper            The mapper class to use.
132   * @param outputKeyClass    The class of the output key.
133   * @param outputValueClass  The class of the output value.
134   * @param job               The current job to adjust. Make sure the passed job is carrying all
135   *                          necessary HBase configuration.
136   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
137   *                          the distributed cache (tmpjars).
138   * @throws IOException When setting up the details fails.
139   */
140  public static void initTableMapperJob(String table, Scan scan,
141    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
142    Job job, boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
143    throws IOException {
144    initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job,
145      addDependencyJars, true, inputFormatClass);
146  }
147
148  /**
149   * Use this before submitting a TableMap job. It will appropriately set up the job.
150   * @param table             The table name to read from.
151   * @param scan              The scan instance with the columns, time range etc.
152   * @param mapper            The mapper class to use.
153   * @param outputKeyClass    The class of the output key.
154   * @param outputValueClass  The class of the output value.
155   * @param job               The current job to adjust. Make sure the passed job is carrying all
156   *                          necessary HBase configuration.
157   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
158   *                          the distributed cache (tmpjars).
159   * @param initCredentials   whether to initialize hbase auth credentials for the job
160   * @param inputFormatClass  the input format
161   * @throws IOException When setting up the details fails.
162   */
163  public static void initTableMapperJob(String table, Scan scan,
164    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
165    Job job, boolean addDependencyJars, boolean initCredentials,
166    Class<? extends InputFormat> inputFormatClass) throws IOException {
167    job.setInputFormatClass(inputFormatClass);
168    if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
169    if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass);
170    job.setMapperClass(mapper);
171    if (Put.class.equals(outputValueClass)) {
172      job.setCombinerClass(PutCombiner.class);
173    }
174    Configuration conf = job.getConfiguration();
175    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
176    conf.set(TableInputFormat.INPUT_TABLE, table);
177    conf.set(TableInputFormat.SCAN, convertScanToString(scan));
178    conf.setStrings("io.serializations", conf.get("io.serializations"),
179      MutationSerialization.class.getName(), ResultSerialization.class.getName(),
180      CellSerialization.class.getName());
181    if (addDependencyJars) {
182      addDependencyJars(job);
183    }
184    if (initCredentials) {
185      initCredentials(job);
186    }
187  }
188
189  /**
190   * Use this before submitting a TableMap job. It will appropriately set up the job.
191   * @param table             Binary representation of the table name to read from.
192   * @param scan              The scan instance with the columns, time range etc.
193   * @param mapper            The mapper class to use.
194   * @param outputKeyClass    The class of the output key.
195   * @param outputValueClass  The class of the output value.
196   * @param job               The current job to adjust. Make sure the passed job is carrying all
197   *                          necessary HBase configuration.
198   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
199   *                          the distributed cache (tmpjars).
200   * @param inputFormatClass  The class of the input format
201   * @throws IOException When setting up the details fails.
202   */
203  public static void initTableMapperJob(byte[] table, Scan scan,
204    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
205    Job job, boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
206    throws IOException {
207    initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, job,
208      addDependencyJars, inputFormatClass);
209  }
210
211  /**
212   * Use this before submitting a TableMap job. It will appropriately set up the job.
213   * @param table             Binary representation of the table name to read from.
214   * @param scan              The scan instance with the columns, time range etc.
215   * @param mapper            The mapper class to use.
216   * @param outputKeyClass    The class of the output key.
217   * @param outputValueClass  The class of the output value.
218   * @param job               The current job to adjust. Make sure the passed job is carrying all
219   *                          necessary HBase configuration.
220   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
221   *                          the distributed cache (tmpjars).
222   * @throws IOException When setting up the details fails.
223   */
224  public static void initTableMapperJob(byte[] table, Scan scan,
225    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
226    Job job, boolean addDependencyJars) throws IOException {
227    initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, job,
228      addDependencyJars, getConfiguredInputFormat(job));
229  }
230
231  /**
232   * @return {@link TableInputFormat} .class unless Configuration has something else at
233   *         {@link #TABLE_INPUT_CLASS_KEY}.
234   */
235  private static Class<? extends InputFormat> getConfiguredInputFormat(Job job) {
236    return (Class<? extends InputFormat>) job.getConfiguration().getClass(TABLE_INPUT_CLASS_KEY,
237      TableInputFormat.class);
238  }
239
240  /**
241   * Use this before submitting a TableMap job. It will appropriately set up the job.
242   * @param table             The table name to read from.
243   * @param scan              The scan instance with the columns, time range etc.
244   * @param mapper            The mapper class to use.
245   * @param outputKeyClass    The class of the output key.
246   * @param outputValueClass  The class of the output value.
247   * @param job               The current job to adjust. Make sure the passed job is carrying all
248   *                          necessary HBase configuration.
249   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
250   *                          the distributed cache (tmpjars).
251   * @throws IOException When setting up the details fails.
252   */
253  public static void initTableMapperJob(String table, Scan scan,
254    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
255    Job job, boolean addDependencyJars) throws IOException {
256    initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job,
257      addDependencyJars, getConfiguredInputFormat(job));
258  }
259
260  /**
261   * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on direct
262   * memory will likely cause the map tasks to OOM when opening the region. This is done here
263   * instead of in TableSnapshotRegionRecordReader in case an advanced user wants to override this
264   * behavior in their job.
265   */
266  public static void resetCacheConfig(Configuration conf) {
267    conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
268    conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f);
269    conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY);
270  }
271
272  /**
273   * Sets up the job for reading from one or more table snapshots, with one or more scans per
274   * snapshot. It bypasses hbase servers and read directly from snapshot files.
275   * @param snapshotScans     map of snapshot name to scans on that snapshot.
276   * @param mapper            The mapper class to use.
277   * @param outputKeyClass    The class of the output key.
278   * @param outputValueClass  The class of the output value.
279   * @param job               The current job to adjust. Make sure the passed job is carrying all
280   *                          necessary HBase configuration.
281   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
282   *                          the distributed cache (tmpjars).
283   */
284  public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
285    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
286    Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
287    MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir);
288
289    job.setInputFormatClass(MultiTableSnapshotInputFormat.class);
290    if (outputValueClass != null) {
291      job.setMapOutputValueClass(outputValueClass);
292    }
293    if (outputKeyClass != null) {
294      job.setMapOutputKeyClass(outputKeyClass);
295    }
296    job.setMapperClass(mapper);
297    Configuration conf = job.getConfiguration();
298    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
299
300    if (addDependencyJars) {
301      addDependencyJars(job);
302      addDependencyJarsForClasses(job.getConfiguration(), MetricRegistry.class);
303    }
304
305    resetCacheConfig(job.getConfiguration());
306  }
307
308  /**
309   * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly
310   * from snapshot files.
311   * @param snapshotName      The name of the snapshot (of a table) to read from.
312   * @param scan              The scan instance with the columns, time range etc.
313   * @param mapper            The mapper class to use.
314   * @param outputKeyClass    The class of the output key.
315   * @param outputValueClass  The class of the output value.
316   * @param job               The current job to adjust. Make sure the passed job is carrying all
317   *                          necessary HBase configuration.
318   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
319   *                          the distributed cache (tmpjars).
320   * @param tmpRestoreDir     a temporary directory to copy the snapshot files into. Current user
321   *                          should have write permissions to this directory, and this should not
322   *                          be a subdirectory of rootdir. After the job is finished, restore
323   *                          directory can be deleted.
324   * @throws IOException When setting up the details fails.
325   * @see TableSnapshotInputFormat
326   */
327  public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
328    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
329    Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
330    TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
331    initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, outputValueClass, job,
332      addDependencyJars, false, TableSnapshotInputFormat.class);
333    resetCacheConfig(job.getConfiguration());
334  }
335
336  /**
337   * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly
338   * from snapshot files.
339   * @param snapshotName       The name of the snapshot (of a table) to read from.
340   * @param scan               The scan instance with the columns, time range etc.
341   * @param mapper             The mapper class to use.
342   * @param outputKeyClass     The class of the output key.
343   * @param outputValueClass   The class of the output value.
344   * @param job                The current job to adjust. Make sure the passed job is carrying all
345   *                           necessary HBase configuration.
346   * @param addDependencyJars  upload HBase jars and jars for any of the configured job classes via
347   *                           the distributed cache (tmpjars).
348   * @param tmpRestoreDir      a temporary directory to copy the snapshot files into. Current user
349   *                           should have write permissions to this directory, and this should not
350   *                           be a subdirectory of rootdir. After the job is finished, restore
351   *                           directory can be deleted.
352   * @param splitAlgo          algorithm to split
353   * @param numSplitsPerRegion how many input splits to generate per one region
354   * @throws IOException When setting up the details fails.
355   * @see TableSnapshotInputFormat
356   */
357  public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
358    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
359    Job job, boolean addDependencyJars, Path tmpRestoreDir, RegionSplitter.SplitAlgorithm splitAlgo,
360    int numSplitsPerRegion) throws IOException {
361    TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir, splitAlgo,
362      numSplitsPerRegion);
363    initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, outputValueClass, job,
364      addDependencyJars, false, TableSnapshotInputFormat.class);
365    resetCacheConfig(job.getConfiguration());
366  }
367
368  /**
369   * Use this before submitting a Multi TableMap job. It will appropriately set up the job.
370   * @param scans            The list of {@link Scan} objects to read from.
371   * @param mapper           The mapper class to use.
372   * @param outputKeyClass   The class of the output key.
373   * @param outputValueClass The class of the output value.
374   * @param job              The current job to adjust. Make sure the passed job is carrying all
375   *                         necessary HBase configuration.
376   * @throws IOException When setting up the details fails.
377   */
378  public static void initTableMapperJob(List<Scan> scans, Class<? extends TableMapper> mapper,
379    Class<?> outputKeyClass, Class<?> outputValueClass, Job job) throws IOException {
380    initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job, true);
381  }
382
383  /**
384   * Use this before submitting a Multi TableMap job. It will appropriately set up the job.
385   * @param scans             The list of {@link Scan} objects to read from.
386   * @param mapper            The mapper class to use.
387   * @param outputKeyClass    The class of the output key.
388   * @param outputValueClass  The class of the output value.
389   * @param job               The current job to adjust. Make sure the passed job is carrying all
390   *                          necessary HBase configuration.
391   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
392   *                          the distributed cache (tmpjars).
393   * @throws IOException When setting up the details fails.
394   */
395  public static void initTableMapperJob(List<Scan> scans, Class<? extends TableMapper> mapper,
396    Class<?> outputKeyClass, Class<?> outputValueClass, Job job, boolean addDependencyJars)
397    throws IOException {
398    initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job, addDependencyJars,
399      true);
400  }
401
402  /**
403   * Use this before submitting a Multi TableMap job. It will appropriately set up the job.
404   * @param scans             The list of {@link Scan} objects to read from.
405   * @param mapper            The mapper class to use.
406   * @param outputKeyClass    The class of the output key.
407   * @param outputValueClass  The class of the output value.
408   * @param job               The current job to adjust. Make sure the passed job is carrying all
409   *                          necessary HBase configuration.
410   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
411   *                          the distributed cache (tmpjars).
412   * @param initCredentials   whether to initialize hbase auth credentials for the job
413   * @throws IOException When setting up the details fails.
414   */
415  public static void initTableMapperJob(List<Scan> scans, Class<? extends TableMapper> mapper,
416    Class<?> outputKeyClass, Class<?> outputValueClass, Job job, boolean addDependencyJars,
417    boolean initCredentials) throws IOException {
418    job.setInputFormatClass(MultiTableInputFormat.class);
419    if (outputValueClass != null) {
420      job.setMapOutputValueClass(outputValueClass);
421    }
422    if (outputKeyClass != null) {
423      job.setMapOutputKeyClass(outputKeyClass);
424    }
425    job.setMapperClass(mapper);
426    Configuration conf = job.getConfiguration();
427    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
428    List<String> scanStrings = new ArrayList<>();
429
430    for (Scan scan : scans) {
431      scanStrings.add(convertScanToString(scan));
432    }
433    job.getConfiguration().setStrings(MultiTableInputFormat.SCANS,
434      scanStrings.toArray(new String[scanStrings.size()]));
435
436    if (addDependencyJars) {
437      addDependencyJars(job);
438    }
439
440    if (initCredentials) {
441      initCredentials(job);
442    }
443  }
444
445  public static void initCredentials(Job job) throws IOException {
446    UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
447    if (userProvider.isHadoopSecurityEnabled()) {
448      // propagate delegation related props from launcher job to MR job
449      if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
450        job.getConfiguration().set("mapreduce.job.credentials.binary",
451          System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
452      }
453    }
454
455    if (userProvider.isHBaseSecurityEnabled()) {
456      try {
457        // init credentials for remote cluster
458        String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
459        User user = userProvider.getCurrent();
460        if (quorumAddress != null) {
461          Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
462            quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX);
463          Connection peerConn = ConnectionFactory.createConnection(peerConf);
464          try {
465            TokenUtil.addTokenForJob(peerConn, user, job);
466          } finally {
467            peerConn.close();
468          }
469        }
470
471        Connection conn = ConnectionFactory.createConnection(job.getConfiguration());
472        try {
473          TokenUtil.addTokenForJob(conn, user, job);
474        } finally {
475          conn.close();
476        }
477      } catch (InterruptedException ie) {
478        LOG.info("Interrupted obtaining user authentication token");
479        Thread.currentThread().interrupt();
480      }
481    }
482  }
483
484  /**
485   * Obtain an authentication token, for the specified cluster, on behalf of the current user and
486   * add it to the credentials for the given map reduce job.
487   * @param job  The job that requires the permission.
488   * @param conf The configuration to use in connecting to the peer cluster
489   * @throws IOException When the authentication token cannot be obtained.
490   */
491  public static void initCredentialsForCluster(Job job, Configuration conf) throws IOException {
492    UserProvider userProvider = UserProvider.instantiate(conf);
493    if (userProvider.isHBaseSecurityEnabled()) {
494      try {
495        Connection peerConn = ConnectionFactory.createConnection(conf);
496        try {
497          TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job);
498        } finally {
499          peerConn.close();
500        }
501      } catch (InterruptedException e) {
502        LOG.info("Interrupted obtaining user authentication token");
503        Thread.interrupted();
504      }
505    }
506  }
507
508  /**
509   * Writes the given scan into a Base64 encoded string.
510   * @param scan The scan to write out.
511   * @return The scan saved in a Base64 encoded string.
512   * @throws IOException When writing the scan fails.
513   */
514  public static String convertScanToString(Scan scan) throws IOException {
515    ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
516    return Bytes.toString(Base64.getEncoder().encode(proto.toByteArray()));
517  }
518
519  /**
520   * Converts the given Base64 string back into a Scan instance.
521   * @param base64 The scan details.
522   * @return The newly created Scan instance.
523   * @throws IOException When reading the scan instance fails.
524   */
525  public static Scan convertStringToScan(String base64) throws IOException {
526    byte[] decoded = Base64.getDecoder().decode(base64);
527    return ProtobufUtil.toScan(ClientProtos.Scan.parseFrom(decoded));
528  }
529
530  /**
531   * Use this before submitting a TableReduce job. It will appropriately set up the JobConf.
532   * @param table   The output table.
533   * @param reducer The reducer class to use.
534   * @param job     The current job to adjust.
535   * @throws IOException When determining the region count fails.
536   */
537  public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer,
538    Job job) throws IOException {
539    initTableReducerJob(table, reducer, job, null);
540  }
541
542  /**
543   * Use this before submitting a TableReduce job. It will appropriately set up the JobConf.
544   * @param table       The output table.
545   * @param reducer     The reducer class to use.
546   * @param job         The current job to adjust.
547   * @param partitioner Partitioner to use. Pass <code>null</code> to use default partitioner.
548   * @throws IOException When determining the region count fails.
549   */
550  public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer,
551    Job job, Class partitioner) throws IOException {
552    initTableReducerJob(table, reducer, job, partitioner, null, null, null);
553  }
554
555  /**
556   * Use this before submitting a TableReduce job. It will appropriately set up the JobConf.
557   * @param table         The output table.
558   * @param reducer       The reducer class to use.
559   * @param job           The current job to adjust. Make sure the passed job is carrying all
560   *                      necessary HBase configuration.
561   * @param partitioner   Partitioner to use. Pass <code>null</code> to use default partitioner.
562   * @param quorumAddress Distant cluster to write to; default is null for output to the cluster
563   *                      that is designated in <code>hbase-site.xml</code>. Set this String to the
564   *                      zookeeper ensemble of an alternate remote cluster when you would have the
565   *                      reduce write a cluster that is other than the default; e.g. copying tables
566   *                      between clusters, the source would be designated by
567   *                      <code>hbase-site.xml</code> and this param would have the ensemble address
568   *                      of the remote cluster. The format to pass is particular. Pass
569   *                      <code> &lt;hbase.zookeeper.quorum&gt;:&lt;
570   *             hbase.zookeeper.client.port&gt;:&lt;zookeeper.znode.parent&gt;
571   * </code>           such as <code>server,server2,server3:2181:/hbase</code>.
572   * @param serverClass   redefined hbase.regionserver.class
573   * @param serverImpl    redefined hbase.regionserver.impl
574   * @throws IOException When determining the region count fails.
575   */
576  public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer,
577    Job job, Class partitioner, String quorumAddress, String serverClass, String serverImpl)
578    throws IOException {
579    initTableReducerJob(table, reducer, job, partitioner, quorumAddress, serverClass, serverImpl,
580      true);
581  }
582
583  /**
584   * Use this before submitting a TableReduce job. It will appropriately set up the JobConf.
585   * @param table             The output table.
586   * @param reducer           The reducer class to use.
587   * @param job               The current job to adjust. Make sure the passed job is carrying all
588   *                          necessary HBase configuration.
589   * @param partitioner       Partitioner to use. Pass <code>null</code> to use default partitioner.
590   * @param quorumAddress     Distant cluster to write to; default is null for output to the cluster
591   *                          that is designated in <code>hbase-site.xml</code>. Set this String to
592   *                          the zookeeper ensemble of an alternate remote cluster when you would
593   *                          have the reduce write a cluster that is other than the default; e.g.
594   *                          copying tables between clusters, the source would be designated by
595   *                          <code>hbase-site.xml</code> and this param would have the ensemble
596   *                          address of the remote cluster. The format to pass is particular. Pass
597   *                          <code> &lt;hbase.zookeeper.quorum&gt;:&lt;
598   *             hbase.zookeeper.client.port&gt;:&lt;zookeeper.znode.parent&gt;
599   * </code>               such as <code>server,server2,server3:2181:/hbase</code>.
600   * @param serverClass       redefined hbase.regionserver.class
601   * @param serverImpl        redefined hbase.regionserver.impl
602   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
603   *                          the distributed cache (tmpjars).
604   * @throws IOException When determining the region count fails.
605   */
606  public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer,
607    Job job, Class partitioner, String quorumAddress, String serverClass, String serverImpl,
608    boolean addDependencyJars) throws IOException {
609
610    Configuration conf = job.getConfiguration();
611    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
612    job.setOutputFormatClass(TableOutputFormat.class);
613    if (reducer != null) job.setReducerClass(reducer);
614    conf.set(TableOutputFormat.OUTPUT_TABLE, table);
615    conf.setStrings("io.serializations", conf.get("io.serializations"),
616      MutationSerialization.class.getName(), ResultSerialization.class.getName());
617    // If passed a quorum/ensemble address, pass it on to TableOutputFormat.
618    if (quorumAddress != null) {
619      // Calling this will validate the format
620      ZKConfig.validateClusterKey(quorumAddress);
621      conf.set(TableOutputFormat.QUORUM_ADDRESS, quorumAddress);
622    }
623    if (serverClass != null && serverImpl != null) {
624      conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
625      conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
626    }
627    job.setOutputKeyClass(ImmutableBytesWritable.class);
628    job.setOutputValueClass(Writable.class);
629    if (partitioner == HRegionPartitioner.class) {
630      job.setPartitionerClass(HRegionPartitioner.class);
631      int regions = getRegionCount(conf, TableName.valueOf(table));
632      if (job.getNumReduceTasks() > regions) {
633        job.setNumReduceTasks(regions);
634      }
635    } else if (partitioner != null) {
636      job.setPartitionerClass(partitioner);
637    }
638
639    if (addDependencyJars) {
640      addDependencyJars(job);
641    }
642
643    initCredentials(job);
644  }
645
646  /**
647   * Ensures that the given number of reduce tasks for the given job configuration does not exceed
648   * the number of regions for the given table.
649   * @param table The table to get the region count for.
650   * @param job   The current job to adjust.
651   * @throws IOException When retrieving the table details fails.
652   */
653  public static void limitNumReduceTasks(String table, Job job) throws IOException {
654    int regions = getRegionCount(job.getConfiguration(), TableName.valueOf(table));
655    if (job.getNumReduceTasks() > regions) {
656      job.setNumReduceTasks(regions);
657    }
658  }
659
660  /**
661   * Sets the number of reduce tasks for the given job configuration to the number of regions the
662   * given table has.
663   * @param table The table to get the region count for.
664   * @param job   The current job to adjust.
665   * @throws IOException When retrieving the table details fails.
666   */
667  public static void setNumReduceTasks(String table, Job job) throws IOException {
668    job.setNumReduceTasks(getRegionCount(job.getConfiguration(), TableName.valueOf(table)));
669  }
670
671  /**
672   * Sets the number of rows to return and cache with each scanner iteration. Higher caching values
673   * will enable faster mapreduce jobs at the expense of requiring more heap to contain the cached
674   * rows.
675   * @param job       The current job to adjust.
676   * @param batchSize The number of rows to return in batch with each scanner iteration.
677   */
678  public static void setScannerCaching(Job job, int batchSize) {
679    job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize);
680  }
681
682  /**
683   * Add HBase and its dependencies (only) to the job configuration.
684   * <p>
685   * This is intended as a low-level API, facilitating code reuse between this class and its mapred
686   * counterpart. It also of use to external tools that need to build a MapReduce job that interacts
687   * with HBase but want fine-grained control over the jars shipped to the cluster.
688   * </p>
689   * @param conf The Configuration object to extend with dependencies.
690   * @see org.apache.hadoop.hbase.mapred.TableMapReduceUtil
691   * @see <a href="https://issues.apache.org/jira/browse/PIG-3285">PIG-3285</a>
692   */
693  public static void addHBaseDependencyJars(Configuration conf) throws IOException {
694    addDependencyJarsForClasses(conf,
695      // explicitly pull a class from each module
696      org.apache.hadoop.hbase.HConstants.class, // hbase-common
697      org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.class, // hbase-protocol-shaded
698      org.apache.hadoop.hbase.client.Put.class, // hbase-client
699      org.apache.hadoop.hbase.ipc.RpcServer.class, // hbase-server
700      org.apache.hadoop.hbase.CompatibilityFactory.class, // hbase-hadoop-compat
701      org.apache.hadoop.hbase.mapreduce.JobUtil.class, // hbase-hadoop2-compat
702      org.apache.hadoop.hbase.mapreduce.TableMapper.class, // hbase-mapreduce
703      org.apache.hadoop.hbase.metrics.impl.FastLongHistogram.class, // hbase-metrics
704      org.apache.hadoop.hbase.metrics.Snapshot.class, // hbase-metrics-api
705      org.apache.hadoop.hbase.replication.ReplicationUtils.class, // hbase-replication
706      org.apache.hadoop.hbase.http.HttpServer.class, // hbase-http
707      org.apache.hadoop.hbase.procedure2.Procedure.class, // hbase-procedure
708      org.apache.hadoop.hbase.zookeeper.ZKWatcher.class, // hbase-zookeeper
709      org.apache.hbase.thirdparty.com.google.common.collect.Lists.class, // hb-shaded-miscellaneous
710      org.apache.hbase.thirdparty.com.google.gson.GsonBuilder.class, // hbase-shaded-gson
711      org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations.class, // hb-sh-protobuf
712      org.apache.hbase.thirdparty.io.netty.channel.Channel.class, // hbase-shaded-netty
713      org.apache.hadoop.hbase.unsafe.HBasePlatformDependent.class, // hbase-unsafe
714      org.apache.zookeeper.ZooKeeper.class, // zookeeper
715      com.codahale.metrics.MetricRegistry.class, // metrics-core
716      org.apache.commons.lang3.ArrayUtils.class, // commons-lang
717      io.opentelemetry.api.trace.Span.class, // opentelemetry-api
718      io.opentelemetry.semconv.trace.attributes.SemanticAttributes.class, // opentelemetry-semconv
719      io.opentelemetry.context.Context.class); // opentelemetry-context
720  }
721
722  /**
723   * Returns a classpath string built from the content of the "tmpjars" value in {@code conf}. Also
724   * exposed to shell scripts via `bin/hbase mapredcp`.
725   */
726  public static String buildDependencyClasspath(Configuration conf) {
727    if (conf == null) {
728      throw new IllegalArgumentException("Must provide a configuration object.");
729    }
730    Set<String> paths = new HashSet<>(conf.getStringCollection("tmpjars"));
731    if (paths.isEmpty()) {
732      throw new IllegalArgumentException("Configuration contains no tmpjars.");
733    }
734    StringBuilder sb = new StringBuilder();
735    for (String s : paths) {
736      // entries can take the form 'file:/path/to/file.jar'.
737      int idx = s.indexOf(":");
738      if (idx != -1) s = s.substring(idx + 1);
739      if (sb.length() > 0) sb.append(File.pathSeparator);
740      sb.append(s);
741    }
742    return sb.toString();
743  }
744
745  /**
746   * Add the HBase dependency jars as well as jars for any of the configured job classes to the job
747   * configuration, so that JobClient will ship them to the cluster and add them to the
748   * DistributedCache.
749   */
750  public static void addDependencyJars(Job job) throws IOException {
751    addHBaseDependencyJars(job.getConfiguration());
752    try {
753      addDependencyJarsForClasses(job.getConfiguration(),
754        // when making changes here, consider also mapred.TableMapReduceUtil
755        // pull job classes
756        job.getMapOutputKeyClass(), job.getMapOutputValueClass(), job.getInputFormatClass(),
757        job.getOutputKeyClass(), job.getOutputValueClass(), job.getOutputFormatClass(),
758        job.getPartitionerClass(), job.getCombinerClass());
759    } catch (ClassNotFoundException e) {
760      throw new IOException(e);
761    }
762  }
763
764  /**
765   * Add the jars containing the given classes to the job's configuration such that JobClient will
766   * ship them to the cluster and add them to the DistributedCache. N.B. that this method at most
767   * adds one jar per class given. If there is more than one jar available containing a class with
768   * the same name as a given class, we don't define which of those jars might be chosen.
769   * @param conf    The Hadoop Configuration to modify
770   * @param classes will add just those dependencies needed to find the given classes
771   * @throws IOException if an underlying library call fails.
772   */
773  @InterfaceAudience.Private
774  public static void addDependencyJarsForClasses(Configuration conf, Class<?>... classes)
775    throws IOException {
776
777    FileSystem localFs = FileSystem.getLocal(conf);
778    Set<String> jars = new HashSet<>();
779    // Add jars that are already in the tmpjars variable
780    jars.addAll(conf.getStringCollection("tmpjars"));
781
782    // add jars as we find them to a map of contents jar name so that we can avoid
783    // creating new jars for classes that have already been packaged.
784    Map<String, String> packagedClasses = new HashMap<>();
785
786    // Add jars containing the specified classes
787    for (Class<?> clazz : classes) {
788      if (clazz == null) continue;
789
790      Path path = findOrCreateJar(clazz, localFs, packagedClasses);
791      if (path == null) {
792        LOG.warn("Could not find jar for class " + clazz + " in order to ship it to the cluster.");
793        continue;
794      }
795      if (!localFs.exists(path)) {
796        LOG.warn("Could not validate jar file " + path + " for class " + clazz);
797        continue;
798      }
799      jars.add(path.toString());
800    }
801    if (jars.isEmpty()) return;
802
803    conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()])));
804  }
805
806  /**
807   * Finds the Jar for a class or creates it if it doesn't exist. If the class is in a directory in
808   * the classpath, it creates a Jar on the fly with the contents of the directory and returns the
809   * path to that Jar. If a Jar is created, it is created in the system temporary directory.
810   * Otherwise, returns an existing jar that contains a class of the same name. Maintains a mapping
811   * from jar contents to the tmp jar created.
812   * @param my_class        the class to find.
813   * @param fs              the FileSystem with which to qualify the returned path.
814   * @param packagedClasses a map of class name to path.
815   * @return a jar file that contains the class.
816   */
817  private static Path findOrCreateJar(Class<?> my_class, FileSystem fs,
818    Map<String, String> packagedClasses) throws IOException {
819    // attempt to locate an existing jar for the class.
820    String jar = findContainingJar(my_class, packagedClasses);
821    if (null == jar || jar.isEmpty()) {
822      jar = getJar(my_class);
823      updateMap(jar, packagedClasses);
824    }
825
826    if (null == jar || jar.isEmpty()) {
827      return null;
828    }
829
830    LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar));
831    return new Path(jar).makeQualified(fs.getUri(), fs.getWorkingDirectory());
832  }
833
834  /**
835   * Add entries to <code>packagedClasses</code> corresponding to class files contained in
836   * <code>jar</code>.
837   * @param jar             The jar who's content to list.
838   * @param packagedClasses map[class -> jar]
839   */
840  private static void updateMap(String jar, Map<String, String> packagedClasses)
841    throws IOException {
842    if (null == jar || jar.isEmpty()) {
843      return;
844    }
845    ZipFile zip = null;
846    try {
847      zip = new ZipFile(jar);
848      for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) {
849        ZipEntry entry = iter.nextElement();
850        if (entry.getName().endsWith("class")) {
851          packagedClasses.put(entry.getName(), jar);
852        }
853      }
854    } finally {
855      if (null != zip) zip.close();
856    }
857  }
858
859  /**
860   * Find a jar that contains a class of the same name, if any. It will return a jar file, even if
861   * that is not the first thing on the class path that has a class with the same name. Looks first
862   * on the classpath and then in the <code>packagedClasses</code> map.
863   * @param my_class the class to find.
864   * @return a jar file that contains the class, or null.
865   */
866  private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses)
867    throws IOException {
868    ClassLoader loader = my_class.getClassLoader();
869
870    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
871
872    if (loader != null) {
873      // first search the classpath
874      for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
875        URL url = itr.nextElement();
876        if ("jar".equals(url.getProtocol())) {
877          String toReturn = url.getPath();
878          if (toReturn.startsWith("file:")) {
879            toReturn = toReturn.substring("file:".length());
880          }
881          // URLDecoder is a misnamed class, since it actually decodes
882          // x-www-form-urlencoded MIME type rather than actual
883          // URL encoding (which the file path has). Therefore it would
884          // decode +s to ' 's which is incorrect (spaces are actually
885          // either unencoded or encoded as "%20"). Replace +s first, so
886          // that they are kept sacred during the decoding process.
887          toReturn = toReturn.replaceAll("\\+", "%2B");
888          toReturn = URLDecoder.decode(toReturn, "UTF-8");
889          return toReturn.replaceAll("!.*$", "");
890        }
891      }
892    }
893
894    // now look in any jars we've packaged using JarFinder. Returns null when
895    // no jar is found.
896    return packagedClasses.get(class_file);
897  }
898
899  /**
900   * Invoke 'getJar' on a custom JarFinder implementation. Useful for some job configuration
901   * contexts (HBASE-8140) and also for testing on MRv2. check if we have HADOOP-9426.
902   * @param my_class the class to find.
903   * @return a jar file that contains the class, or null.
904   */
905  private static String getJar(Class<?> my_class) {
906    String ret = null;
907    try {
908      ret = JarFinder.getJar(my_class);
909    } catch (Exception e) {
910      // toss all other exceptions, related to reflection failure
911      throw new RuntimeException("getJar invocation failed.", e);
912    }
913
914    return ret;
915  }
916
917  private static int getRegionCount(Configuration conf, TableName tableName) throws IOException {
918    try (Connection conn = ConnectionFactory.createConnection(conf);
919      RegionLocator locator = conn.getRegionLocator(tableName)) {
920      return locator.getAllRegionLocations().size();
921    }
922  }
923}