001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import com.codahale.metrics.MetricRegistry;
021import java.io.File;
022import java.io.IOException;
023import java.net.URL;
024import java.net.URLDecoder;
025import java.util.ArrayList;
026import java.util.Base64;
027import java.util.Collection;
028import java.util.Enumeration;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034import java.util.zip.ZipEntry;
035import java.util.zip.ZipFile;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.ConnectionFactory;
044import org.apache.hadoop.hbase.client.Put;
045import org.apache.hadoop.hbase.client.RegionLocator;
046import org.apache.hadoop.hbase.client.Scan;
047import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
048import org.apache.hadoop.hbase.security.User;
049import org.apache.hadoop.hbase.security.UserProvider;
050import org.apache.hadoop.hbase.security.token.TokenUtil;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.RegionSplitter;
053import org.apache.hadoop.hbase.zookeeper.ZKConfig;
054import org.apache.hadoop.io.Writable;
055import org.apache.hadoop.mapreduce.InputFormat;
056import org.apache.hadoop.mapreduce.Job;
057import org.apache.hadoop.util.StringUtils;
058import org.apache.yetus.audience.InterfaceAudience;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
064
065/**
066 * Utility for {@link TableMapper} and {@link TableReducer}
067 */
068@SuppressWarnings({ "rawtypes", "unchecked" })
069@InterfaceAudience.Public
070public class TableMapReduceUtil {
071  private static final Logger LOG = LoggerFactory.getLogger(TableMapReduceUtil.class);
072  public static final String TABLE_INPUT_CLASS_KEY = "hbase.table.input.class";
073
074  /**
075   * Use this before submitting a TableMap job. It will appropriately set up the job.
076   * @param table            The table name to read from.
077   * @param scan             The scan instance with the columns, time range etc.
078   * @param mapper           The mapper class to use.
079   * @param outputKeyClass   The class of the output key.
080   * @param outputValueClass The class of the output value.
081   * @param job              The current job to adjust. Make sure the passed job is carrying all
082   *                         necessary HBase configuration.
083   * @throws IOException When setting up the details fails.
084   */
085  public static void initTableMapperJob(String table, Scan scan,
086    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
087    Job job) throws IOException {
088    initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job, true);
089  }
090
091  /**
092   * Use this before submitting a TableMap job. It will appropriately set up the job.
093   * @param table            The table name to read from.
094   * @param scan             The scan instance with the columns, time range etc.
095   * @param mapper           The mapper class to use.
096   * @param outputKeyClass   The class of the output key.
097   * @param outputValueClass The class of the output value.
098   * @param job              The current job to adjust. Make sure the passed job is carrying all
099   *                         necessary HBase configuration.
100   * @throws IOException When setting up the details fails.
101   */
102  public static void initTableMapperJob(TableName table, Scan scan,
103    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
104    Job job) throws IOException {
105    initTableMapperJob(table.getNameAsString(), scan, mapper, outputKeyClass, outputValueClass, job,
106      true);
107  }
108
109  /**
110   * Use this before submitting a TableMap job. It will appropriately set up the job.
111   * @param table            Binary representation of the table name to read from.
112   * @param scan             The scan instance with the columns, time range etc.
113   * @param mapper           The mapper class to use.
114   * @param outputKeyClass   The class of the output key.
115   * @param outputValueClass The class of the output value.
116   * @param job              The current job to adjust. Make sure the passed job is carrying all
117   *                         necessary HBase configuration.
118   * @throws IOException When setting up the details fails.
119   */
120  public static void initTableMapperJob(byte[] table, Scan scan,
121    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
122    Job job) throws IOException {
123    initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, job,
124      true);
125  }
126
127  /**
128   * Use this before submitting a TableMap job. It will appropriately set up the job.
129   * @param table             The table name to read from.
130   * @param scan              The scan instance with the columns, time range etc.
131   * @param mapper            The mapper class to use.
132   * @param outputKeyClass    The class of the output key.
133   * @param outputValueClass  The class of the output value.
134   * @param job               The current job to adjust. Make sure the passed job is carrying all
135   *                          necessary HBase configuration.
136   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
137   *                          the distributed cache (tmpjars).
138   * @throws IOException When setting up the details fails.
139   */
140  public static void initTableMapperJob(String table, Scan scan,
141    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
142    Job job, boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
143    throws IOException {
144    initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job,
145      addDependencyJars, true, inputFormatClass);
146  }
147
148  /**
149   * Use this before submitting a TableMap job. It will appropriately set up the job.
150   * @param table             The table name to read from.
151   * @param scan              The scan instance with the columns, time range etc.
152   * @param mapper            The mapper class to use.
153   * @param outputKeyClass    The class of the output key.
154   * @param outputValueClass  The class of the output value.
155   * @param job               The current job to adjust. Make sure the passed job is carrying all
156   *                          necessary HBase configuration.
157   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
158   *                          the distributed cache (tmpjars).
159   * @param initCredentials   whether to initialize hbase auth credentials for the job
160   * @param inputFormatClass  the input format
161   * @throws IOException When setting up the details fails.
162   */
163  public static void initTableMapperJob(String table, Scan scan,
164    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
165    Job job, boolean addDependencyJars, boolean initCredentials,
166    Class<? extends InputFormat> inputFormatClass) throws IOException {
167    job.setInputFormatClass(inputFormatClass);
168    if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
169    if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass);
170    job.setMapperClass(mapper);
171    if (Put.class.equals(outputValueClass)) {
172      job.setCombinerClass(PutCombiner.class);
173    }
174    Configuration conf = job.getConfiguration();
175    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
176    conf.set(TableInputFormat.INPUT_TABLE, table);
177    conf.set(TableInputFormat.SCAN, convertScanToString(scan));
178    conf.setStrings("io.serializations", conf.get("io.serializations"),
179      MutationSerialization.class.getName(), ResultSerialization.class.getName(),
180      CellSerialization.class.getName());
181    if (addDependencyJars) {
182      addDependencyJars(job);
183    }
184    if (initCredentials) {
185      initCredentials(job);
186    }
187  }
188
189  /**
190   * Use this before submitting a TableMap job. It will appropriately set up the job.
191   * @param table             Binary representation of the table name to read from.
192   * @param scan              The scan instance with the columns, time range etc.
193   * @param mapper            The mapper class to use.
194   * @param outputKeyClass    The class of the output key.
195   * @param outputValueClass  The class of the output value.
196   * @param job               The current job to adjust. Make sure the passed job is carrying all
197   *                          necessary HBase configuration.
198   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
199   *                          the distributed cache (tmpjars).
200   * @param inputFormatClass  The class of the input format
201   * @throws IOException When setting up the details fails.
202   */
203  public static void initTableMapperJob(byte[] table, Scan scan,
204    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
205    Job job, boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
206    throws IOException {
207    initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, job,
208      addDependencyJars, inputFormatClass);
209  }
210
211  /**
212   * Use this before submitting a TableMap job. It will appropriately set up the job.
213   * @param table             Binary representation of the table name to read from.
214   * @param scan              The scan instance with the columns, time range etc.
215   * @param mapper            The mapper class to use.
216   * @param outputKeyClass    The class of the output key.
217   * @param outputValueClass  The class of the output value.
218   * @param job               The current job to adjust. Make sure the passed job is carrying all
219   *                          necessary HBase configuration.
220   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
221   *                          the distributed cache (tmpjars).
222   * @throws IOException When setting up the details fails.
223   */
224  public static void initTableMapperJob(byte[] table, Scan scan,
225    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
226    Job job, boolean addDependencyJars) throws IOException {
227    initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, job,
228      addDependencyJars, getConfiguredInputFormat(job));
229  }
230
231  /**
232   * @return {@link TableInputFormat} .class unless Configuration has something else at
233   *         {@link #TABLE_INPUT_CLASS_KEY}.
234   */
235  private static Class<? extends InputFormat> getConfiguredInputFormat(Job job) {
236    return (Class<? extends InputFormat>) job.getConfiguration().getClass(TABLE_INPUT_CLASS_KEY,
237      TableInputFormat.class);
238  }
239
240  /**
241   * Use this before submitting a TableMap job. It will appropriately set up the job.
242   * @param table             The table name to read from.
243   * @param scan              The scan instance with the columns, time range etc.
244   * @param mapper            The mapper class to use.
245   * @param outputKeyClass    The class of the output key.
246   * @param outputValueClass  The class of the output value.
247   * @param job               The current job to adjust. Make sure the passed job is carrying all
248   *                          necessary HBase configuration.
249   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
250   *                          the distributed cache (tmpjars).
251   * @throws IOException When setting up the details fails.
252   */
253  public static void initTableMapperJob(String table, Scan scan,
254    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
255    Job job, boolean addDependencyJars) throws IOException {
256    initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job,
257      addDependencyJars, getConfiguredInputFormat(job));
258  }
259
260  /**
261   * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on direct
262   * memory will likely cause the map tasks to OOM when opening the region. This is done here
263   * instead of in TableSnapshotRegionRecordReader in case an advanced user wants to override this
264   * behavior in their job.
265   */
266  public static void resetCacheConfig(Configuration conf) {
267    conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
268    conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f);
269    conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY);
270  }
271
272  /**
273   * Sets up the job for reading from one or more table snapshots, with one or more scans per
274   * snapshot. It bypasses hbase servers and read directly from snapshot files.
275   * @param snapshotScans     map of snapshot name to scans on that snapshot.
276   * @param mapper            The mapper class to use.
277   * @param outputKeyClass    The class of the output key.
278   * @param outputValueClass  The class of the output value.
279   * @param job               The current job to adjust. Make sure the passed job is carrying all
280   *                          necessary HBase configuration.
281   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
282   *                          the distributed cache (tmpjars).
283   */
284  public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
285    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
286    Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
287    MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir);
288
289    job.setInputFormatClass(MultiTableSnapshotInputFormat.class);
290    if (outputValueClass != null) {
291      job.setMapOutputValueClass(outputValueClass);
292    }
293    if (outputKeyClass != null) {
294      job.setMapOutputKeyClass(outputKeyClass);
295    }
296    job.setMapperClass(mapper);
297    Configuration conf = job.getConfiguration();
298    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
299
300    if (addDependencyJars) {
301      addDependencyJars(job);
302      addDependencyJarsForClasses(job.getConfiguration(), MetricRegistry.class);
303    }
304
305    resetCacheConfig(job.getConfiguration());
306  }
307
308  /**
309   * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly
310   * from snapshot files.
311   * @param snapshotName      The name of the snapshot (of a table) to read from.
312   * @param scan              The scan instance with the columns, time range etc.
313   * @param mapper            The mapper class to use.
314   * @param outputKeyClass    The class of the output key.
315   * @param outputValueClass  The class of the output value.
316   * @param job               The current job to adjust. Make sure the passed job is carrying all
317   *                          necessary HBase configuration.
318   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
319   *                          the distributed cache (tmpjars).
320   * @param tmpRestoreDir     a temporary directory to copy the snapshot files into. Current user
321   *                          should have write permissions to this directory, and this should not
322   *                          be a subdirectory of rootdir. After the job is finished, restore
323   *                          directory can be deleted.
324   * @throws IOException When setting up the details fails.
325   * @see TableSnapshotInputFormat
326   */
327  public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
328    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
329    Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
330    TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
331    initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, outputValueClass, job,
332      addDependencyJars, false, TableSnapshotInputFormat.class);
333    resetCacheConfig(job.getConfiguration());
334  }
335
336  /**
337   * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly
338   * from snapshot files.
339   * @param snapshotName       The name of the snapshot (of a table) to read from.
340   * @param scan               The scan instance with the columns, time range etc.
341   * @param mapper             The mapper class to use.
342   * @param outputKeyClass     The class of the output key.
343   * @param outputValueClass   The class of the output value.
344   * @param job                The current job to adjust. Make sure the passed job is carrying all
345   *                           necessary HBase configuration.
346   * @param addDependencyJars  upload HBase jars and jars for any of the configured job classes via
347   *                           the distributed cache (tmpjars).
348   * @param tmpRestoreDir      a temporary directory to copy the snapshot files into. Current user
349   *                           should have write permissions to this directory, and this should not
350   *                           be a subdirectory of rootdir. After the job is finished, restore
351   *                           directory can be deleted.
352   * @param splitAlgo          algorithm to split
353   * @param numSplitsPerRegion how many input splits to generate per one region
354   * @throws IOException When setting up the details fails.
355   * @see TableSnapshotInputFormat
356   */
357  public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
358    Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
359    Job job, boolean addDependencyJars, Path tmpRestoreDir, RegionSplitter.SplitAlgorithm splitAlgo,
360    int numSplitsPerRegion) throws IOException {
361    TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir, splitAlgo,
362      numSplitsPerRegion);
363    initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, outputValueClass, job,
364      addDependencyJars, false, TableSnapshotInputFormat.class);
365    resetCacheConfig(job.getConfiguration());
366  }
367
368  /**
369   * Use this before submitting a Multi TableMap job. It will appropriately set up the job.
370   * @param scans            The list of {@link Scan} objects to read from.
371   * @param mapper           The mapper class to use.
372   * @param outputKeyClass   The class of the output key.
373   * @param outputValueClass The class of the output value.
374   * @param job              The current job to adjust. Make sure the passed job is carrying all
375   *                         necessary HBase configuration.
376   * @throws IOException When setting up the details fails.
377   */
378  public static void initTableMapperJob(List<Scan> scans, Class<? extends TableMapper> mapper,
379    Class<?> outputKeyClass, Class<?> outputValueClass, Job job) throws IOException {
380    initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job, true);
381  }
382
383  /**
384   * Use this before submitting a Multi TableMap job. It will appropriately set up the job.
385   * @param scans             The list of {@link Scan} objects to read from.
386   * @param mapper            The mapper class to use.
387   * @param outputKeyClass    The class of the output key.
388   * @param outputValueClass  The class of the output value.
389   * @param job               The current job to adjust. Make sure the passed job is carrying all
390   *                          necessary HBase configuration.
391   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
392   *                          the distributed cache (tmpjars).
393   * @throws IOException When setting up the details fails.
394   */
395  public static void initTableMapperJob(List<Scan> scans, Class<? extends TableMapper> mapper,
396    Class<?> outputKeyClass, Class<?> outputValueClass, Job job, boolean addDependencyJars)
397    throws IOException {
398    initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job, addDependencyJars,
399      true);
400  }
401
402  /**
403   * Use this before submitting a Multi TableMap job. It will appropriately set up the job.
404   * @param scans             The list of {@link Scan} objects to read from.
405   * @param mapper            The mapper class to use.
406   * @param outputKeyClass    The class of the output key.
407   * @param outputValueClass  The class of the output value.
408   * @param job               The current job to adjust. Make sure the passed job is carrying all
409   *                          necessary HBase configuration.
410   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
411   *                          the distributed cache (tmpjars).
412   * @param initCredentials   whether to initialize hbase auth credentials for the job
413   * @throws IOException When setting up the details fails.
414   */
415  public static void initTableMapperJob(List<Scan> scans, Class<? extends TableMapper> mapper,
416    Class<?> outputKeyClass, Class<?> outputValueClass, Job job, boolean addDependencyJars,
417    boolean initCredentials) throws IOException {
418    job.setInputFormatClass(MultiTableInputFormat.class);
419    if (outputValueClass != null) {
420      job.setMapOutputValueClass(outputValueClass);
421    }
422    if (outputKeyClass != null) {
423      job.setMapOutputKeyClass(outputKeyClass);
424    }
425    job.setMapperClass(mapper);
426    Configuration conf = job.getConfiguration();
427    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
428    List<String> scanStrings = new ArrayList<>();
429
430    for (Scan scan : scans) {
431      scanStrings.add(convertScanToString(scan));
432    }
433    job.getConfiguration().setStrings(MultiTableInputFormat.SCANS,
434      scanStrings.toArray(new String[scanStrings.size()]));
435
436    if (addDependencyJars) {
437      addDependencyJars(job);
438    }
439
440    if (initCredentials) {
441      initCredentials(job);
442    }
443  }
444
445  public static void initCredentials(Job job) throws IOException {
446    UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
447    if (userProvider.isHadoopSecurityEnabled()) {
448      // propagate delegation related props from launcher job to MR job
449      if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
450        job.getConfiguration().set("mapreduce.job.credentials.binary",
451          System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
452      }
453    }
454
455    if (userProvider.isHBaseSecurityEnabled()) {
456      try {
457        // init credentials for remote cluster
458        String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
459        User user = userProvider.getCurrent();
460        if (quorumAddress != null) {
461          Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
462            quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX);
463          Connection peerConn = ConnectionFactory.createConnection(peerConf);
464          try {
465            TokenUtil.addTokenForJob(peerConn, user, job);
466          } finally {
467            peerConn.close();
468          }
469        }
470
471        Connection conn = ConnectionFactory.createConnection(job.getConfiguration());
472        try {
473          TokenUtil.addTokenForJob(conn, user, job);
474        } finally {
475          conn.close();
476        }
477      } catch (InterruptedException ie) {
478        LOG.info("Interrupted obtaining user authentication token");
479        Thread.currentThread().interrupt();
480      }
481    }
482  }
483
484  /**
485   * Obtain an authentication token, for the specified cluster, on behalf of the current user and
486   * add it to the credentials for the given map reduce job. The quorumAddress is the key to the ZK
487   * ensemble, which contains: hbase.zookeeper.quorum, hbase.zookeeper.client.port and
488   * zookeeper.znode.parent
489   * @param job           The job that requires the permission.
490   * @param quorumAddress string that contains the 3 required configuratins
491   * @throws IOException When the authentication token cannot be obtained.
492   * @deprecated Since 1.2.0 and will be removed in 3.0.0. Use
493   *             {@link #initCredentialsForCluster(Job, Configuration)} instead.
494   * @see #initCredentialsForCluster(Job, Configuration)
495   * @see <a href="https://issues.apache.org/jira/browse/HBASE-14886">HBASE-14886</a>
496   */
497  @Deprecated
498  public static void initCredentialsForCluster(Job job, String quorumAddress) throws IOException {
499    Configuration peerConf =
500      HBaseConfiguration.createClusterConf(job.getConfiguration(), quorumAddress);
501    initCredentialsForCluster(job, peerConf);
502  }
503
504  /**
505   * Obtain an authentication token, for the specified cluster, on behalf of the current user and
506   * add it to the credentials for the given map reduce job.
507   * @param job  The job that requires the permission.
508   * @param conf The configuration to use in connecting to the peer cluster
509   * @throws IOException When the authentication token cannot be obtained.
510   */
511  public static void initCredentialsForCluster(Job job, Configuration conf) throws IOException {
512    UserProvider userProvider = UserProvider.instantiate(conf);
513    if (userProvider.isHBaseSecurityEnabled()) {
514      try {
515        Connection peerConn = ConnectionFactory.createConnection(conf);
516        try {
517          TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job);
518        } finally {
519          peerConn.close();
520        }
521      } catch (InterruptedException e) {
522        LOG.info("Interrupted obtaining user authentication token");
523        Thread.interrupted();
524      }
525    }
526  }
527
528  /**
529   * Writes the given scan into a Base64 encoded string.
530   * @param scan The scan to write out.
531   * @return The scan saved in a Base64 encoded string.
532   * @throws IOException When writing the scan fails.
533   */
534  public static String convertScanToString(Scan scan) throws IOException {
535    ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
536    return Bytes.toString(Base64.getEncoder().encode(proto.toByteArray()));
537  }
538
539  /**
540   * Converts the given Base64 string back into a Scan instance.
541   * @param base64 The scan details.
542   * @return The newly created Scan instance.
543   * @throws IOException When reading the scan instance fails.
544   */
545  public static Scan convertStringToScan(String base64) throws IOException {
546    byte[] decoded = Base64.getDecoder().decode(base64);
547    return ProtobufUtil.toScan(ClientProtos.Scan.parseFrom(decoded));
548  }
549
550  /**
551   * Use this before submitting a TableReduce job. It will appropriately set up the JobConf.
552   * @param table   The output table.
553   * @param reducer The reducer class to use.
554   * @param job     The current job to adjust.
555   * @throws IOException When determining the region count fails.
556   */
557  public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer,
558    Job job) throws IOException {
559    initTableReducerJob(table, reducer, job, null);
560  }
561
562  /**
563   * Use this before submitting a TableReduce job. It will appropriately set up the JobConf.
564   * @param table       The output table.
565   * @param reducer     The reducer class to use.
566   * @param job         The current job to adjust.
567   * @param partitioner Partitioner to use. Pass <code>null</code> to use default partitioner.
568   * @throws IOException When determining the region count fails.
569   */
570  public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer,
571    Job job, Class partitioner) throws IOException {
572    initTableReducerJob(table, reducer, job, partitioner, null, null, null);
573  }
574
575  /**
576   * Use this before submitting a TableReduce job. It will appropriately set up the JobConf.
577   * @param table         The output table.
578   * @param reducer       The reducer class to use.
579   * @param job           The current job to adjust. Make sure the passed job is carrying all
580   *                      necessary HBase configuration.
581   * @param partitioner   Partitioner to use. Pass <code>null</code> to use default partitioner.
582   * @param quorumAddress Distant cluster to write to; default is null for output to the cluster
583   *                      that is designated in <code>hbase-site.xml</code>. Set this String to the
584   *                      zookeeper ensemble of an alternate remote cluster when you would have the
585   *                      reduce write a cluster that is other than the default; e.g. copying tables
586   *                      between clusters, the source would be designated by
587   *                      <code>hbase-site.xml</code> and this param would have the ensemble address
588   *                      of the remote cluster. The format to pass is particular. Pass
589   *                      <code> &lt;hbase.zookeeper.quorum&gt;:&lt;
590   *             hbase.zookeeper.client.port&gt;:&lt;zookeeper.znode.parent&gt;
591   * </code>           such as <code>server,server2,server3:2181:/hbase</code>.
592   * @param serverClass   redefined hbase.regionserver.class
593   * @param serverImpl    redefined hbase.regionserver.impl
594   * @throws IOException When determining the region count fails.
595   */
596  public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer,
597    Job job, Class partitioner, String quorumAddress, String serverClass, String serverImpl)
598    throws IOException {
599    initTableReducerJob(table, reducer, job, partitioner, quorumAddress, serverClass, serverImpl,
600      true);
601  }
602
603  /**
604   * Use this before submitting a TableReduce job. It will appropriately set up the JobConf.
605   * @param table             The output table.
606   * @param reducer           The reducer class to use.
607   * @param job               The current job to adjust. Make sure the passed job is carrying all
608   *                          necessary HBase configuration.
609   * @param partitioner       Partitioner to use. Pass <code>null</code> to use default partitioner.
610   * @param quorumAddress     Distant cluster to write to; default is null for output to the cluster
611   *                          that is designated in <code>hbase-site.xml</code>. Set this String to
612   *                          the zookeeper ensemble of an alternate remote cluster when you would
613   *                          have the reduce write a cluster that is other than the default; e.g.
614   *                          copying tables between clusters, the source would be designated by
615   *                          <code>hbase-site.xml</code> and this param would have the ensemble
616   *                          address of the remote cluster. The format to pass is particular. Pass
617   *                          <code> &lt;hbase.zookeeper.quorum&gt;:&lt;
618   *             hbase.zookeeper.client.port&gt;:&lt;zookeeper.znode.parent&gt;
619   * </code>               such as <code>server,server2,server3:2181:/hbase</code>.
620   * @param serverClass       redefined hbase.regionserver.class
621   * @param serverImpl        redefined hbase.regionserver.impl
622   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
623   *                          the distributed cache (tmpjars).
624   * @throws IOException When determining the region count fails.
625   */
626  public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer,
627    Job job, Class partitioner, String quorumAddress, String serverClass, String serverImpl,
628    boolean addDependencyJars) throws IOException {
629
630    Configuration conf = job.getConfiguration();
631    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
632    job.setOutputFormatClass(TableOutputFormat.class);
633    if (reducer != null) job.setReducerClass(reducer);
634    conf.set(TableOutputFormat.OUTPUT_TABLE, table);
635    conf.setStrings("io.serializations", conf.get("io.serializations"),
636      MutationSerialization.class.getName(), ResultSerialization.class.getName());
637    // If passed a quorum/ensemble address, pass it on to TableOutputFormat.
638    if (quorumAddress != null) {
639      // Calling this will validate the format
640      ZKConfig.validateClusterKey(quorumAddress);
641      conf.set(TableOutputFormat.QUORUM_ADDRESS, quorumAddress);
642    }
643    if (serverClass != null && serverImpl != null) {
644      conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
645      conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
646    }
647    job.setOutputKeyClass(ImmutableBytesWritable.class);
648    job.setOutputValueClass(Writable.class);
649    if (partitioner == HRegionPartitioner.class) {
650      job.setPartitionerClass(HRegionPartitioner.class);
651      int regions = getRegionCount(conf, TableName.valueOf(table));
652      if (job.getNumReduceTasks() > regions) {
653        job.setNumReduceTasks(regions);
654      }
655    } else if (partitioner != null) {
656      job.setPartitionerClass(partitioner);
657    }
658
659    if (addDependencyJars) {
660      addDependencyJars(job);
661    }
662
663    initCredentials(job);
664  }
665
666  /**
667   * Ensures that the given number of reduce tasks for the given job configuration does not exceed
668   * the number of regions for the given table.
669   * @param table The table to get the region count for.
670   * @param job   The current job to adjust.
671   * @throws IOException When retrieving the table details fails.
672   */
673  public static void limitNumReduceTasks(String table, Job job) throws IOException {
674    int regions = getRegionCount(job.getConfiguration(), TableName.valueOf(table));
675    if (job.getNumReduceTasks() > regions) {
676      job.setNumReduceTasks(regions);
677    }
678  }
679
680  /**
681   * Sets the number of reduce tasks for the given job configuration to the number of regions the
682   * given table has.
683   * @param table The table to get the region count for.
684   * @param job   The current job to adjust.
685   * @throws IOException When retrieving the table details fails.
686   */
687  public static void setNumReduceTasks(String table, Job job) throws IOException {
688    job.setNumReduceTasks(getRegionCount(job.getConfiguration(), TableName.valueOf(table)));
689  }
690
691  /**
692   * Sets the number of rows to return and cache with each scanner iteration. Higher caching values
693   * will enable faster mapreduce jobs at the expense of requiring more heap to contain the cached
694   * rows.
695   * @param job       The current job to adjust.
696   * @param batchSize The number of rows to return in batch with each scanner iteration.
697   */
698  public static void setScannerCaching(Job job, int batchSize) {
699    job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize);
700  }
701
702  /**
703   * Add HBase and its dependencies (only) to the job configuration.
704   * <p>
705   * This is intended as a low-level API, facilitating code reuse between this class and its mapred
706   * counterpart. It also of use to external tools that need to build a MapReduce job that interacts
707   * with HBase but want fine-grained control over the jars shipped to the cluster.
708   * </p>
709   * @param conf The Configuration object to extend with dependencies.
710   * @see org.apache.hadoop.hbase.mapred.TableMapReduceUtil
711   * @see <a href="https://issues.apache.org/jira/browse/PIG-3285">PIG-3285</a>
712   */
713  public static void addHBaseDependencyJars(Configuration conf) throws IOException {
714    addDependencyJarsForClasses(conf,
715      // explicitly pull a class from each module
716      org.apache.hadoop.hbase.HConstants.class, // hbase-common
717      org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.class, // hbase-protocol-shaded
718      org.apache.hadoop.hbase.client.Put.class, // hbase-client
719      org.apache.hadoop.hbase.ipc.RpcServer.class, // hbase-server
720      org.apache.hadoop.hbase.CompatibilityFactory.class, // hbase-hadoop-compat
721      org.apache.hadoop.hbase.mapreduce.JobUtil.class, // hbase-hadoop2-compat
722      org.apache.hadoop.hbase.mapreduce.TableMapper.class, // hbase-mapreduce
723      org.apache.hadoop.hbase.metrics.impl.FastLongHistogram.class, // hbase-metrics
724      org.apache.hadoop.hbase.metrics.Snapshot.class, // hbase-metrics-api
725      org.apache.hadoop.hbase.replication.ReplicationUtils.class, // hbase-replication
726      org.apache.hadoop.hbase.http.HttpServer.class, // hbase-http
727      org.apache.hadoop.hbase.procedure2.Procedure.class, // hbase-procedure
728      org.apache.hadoop.hbase.zookeeper.ZKWatcher.class, // hbase-zookeeper
729      org.apache.hbase.thirdparty.com.google.common.collect.Lists.class, // hb-shaded-miscellaneous
730      org.apache.hbase.thirdparty.com.google.gson.GsonBuilder.class, // hbase-shaded-gson
731      org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations.class, // hb-sh-protobuf
732      org.apache.hbase.thirdparty.io.netty.channel.Channel.class, // hbase-shaded-netty
733      org.apache.hadoop.hbase.unsafe.HBasePlatformDependent.class, // hbase-unsafe
734      org.apache.zookeeper.ZooKeeper.class, // zookeeper
735      com.codahale.metrics.MetricRegistry.class, // metrics-core
736      org.apache.commons.lang3.ArrayUtils.class, // commons-lang
737      io.opentelemetry.api.trace.Span.class, // opentelemetry-api
738      io.opentelemetry.semconv.trace.attributes.SemanticAttributes.class, // opentelemetry-semconv
739      io.opentelemetry.context.Context.class); // opentelemetry-context
740  }
741
742  /**
743   * Returns a classpath string built from the content of the "tmpjars" value in {@code conf}. Also
744   * exposed to shell scripts via `bin/hbase mapredcp`.
745   */
746  public static String buildDependencyClasspath(Configuration conf) {
747    if (conf == null) {
748      throw new IllegalArgumentException("Must provide a configuration object.");
749    }
750    Set<String> paths = new HashSet<>(conf.getStringCollection("tmpjars"));
751    if (paths.isEmpty()) {
752      throw new IllegalArgumentException("Configuration contains no tmpjars.");
753    }
754    StringBuilder sb = new StringBuilder();
755    for (String s : paths) {
756      // entries can take the form 'file:/path/to/file.jar'.
757      int idx = s.indexOf(":");
758      if (idx != -1) s = s.substring(idx + 1);
759      if (sb.length() > 0) sb.append(File.pathSeparator);
760      sb.append(s);
761    }
762    return sb.toString();
763  }
764
765  /**
766   * Add the HBase dependency jars as well as jars for any of the configured job classes to the job
767   * configuration, so that JobClient will ship them to the cluster and add them to the
768   * DistributedCache.
769   */
770  public static void addDependencyJars(Job job) throws IOException {
771    addHBaseDependencyJars(job.getConfiguration());
772    try {
773      addDependencyJarsForClasses(job.getConfiguration(),
774        // when making changes here, consider also mapred.TableMapReduceUtil
775        // pull job classes
776        job.getMapOutputKeyClass(), job.getMapOutputValueClass(), job.getInputFormatClass(),
777        job.getOutputKeyClass(), job.getOutputValueClass(), job.getOutputFormatClass(),
778        job.getPartitionerClass(), job.getCombinerClass());
779    } catch (ClassNotFoundException e) {
780      throw new IOException(e);
781    }
782  }
783
784  /**
785   * Add the jars containing the given classes to the job's configuration such that JobClient will
786   * ship them to the cluster and add them to the DistributedCache.
787   * @deprecated since 1.3.0 and will be removed in 3.0.0. Use {@link #addDependencyJars(Job)}
788   *             instead.
789   * @see #addDependencyJars(Job)
790   * @see <a href="https://issues.apache.org/jira/browse/HBASE-8386">HBASE-8386</a>
791   */
792  @Deprecated
793  public static void addDependencyJars(Configuration conf, Class<?>... classes) throws IOException {
794    LOG.warn("The addDependencyJars(Configuration, Class<?>...) method has been deprecated since it"
795      + " is easy to use incorrectly. Most users should rely on addDependencyJars(Job) "
796      + "instead. See HBASE-8386 for more details.");
797    addDependencyJarsForClasses(conf, classes);
798  }
799
800  /**
801   * Add the jars containing the given classes to the job's configuration such that JobClient will
802   * ship them to the cluster and add them to the DistributedCache. N.B. that this method at most
803   * adds one jar per class given. If there is more than one jar available containing a class with
804   * the same name as a given class, we don't define which of those jars might be chosen.
805   * @param conf    The Hadoop Configuration to modify
806   * @param classes will add just those dependencies needed to find the given classes
807   * @throws IOException if an underlying library call fails.
808   */
809  @InterfaceAudience.Private
810  public static void addDependencyJarsForClasses(Configuration conf, Class<?>... classes)
811    throws IOException {
812
813    FileSystem localFs = FileSystem.getLocal(conf);
814    Set<String> jars = new HashSet<>();
815    // Add jars that are already in the tmpjars variable
816    jars.addAll(conf.getStringCollection("tmpjars"));
817
818    // add jars as we find them to a map of contents jar name so that we can avoid
819    // creating new jars for classes that have already been packaged.
820    Map<String, String> packagedClasses = new HashMap<>();
821
822    // Add jars containing the specified classes
823    for (Class<?> clazz : classes) {
824      if (clazz == null) continue;
825
826      Path path = findOrCreateJar(clazz, localFs, packagedClasses);
827      if (path == null) {
828        LOG.warn("Could not find jar for class " + clazz + " in order to ship it to the cluster.");
829        continue;
830      }
831      if (!localFs.exists(path)) {
832        LOG.warn("Could not validate jar file " + path + " for class " + clazz);
833        continue;
834      }
835      jars.add(path.toString());
836    }
837    if (jars.isEmpty()) return;
838
839    conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()])));
840  }
841
842  /**
843   * Finds the Jar for a class or creates it if it doesn't exist. If the class is in a directory in
844   * the classpath, it creates a Jar on the fly with the contents of the directory and returns the
845   * path to that Jar. If a Jar is created, it is created in the system temporary directory.
846   * Otherwise, returns an existing jar that contains a class of the same name. Maintains a mapping
847   * from jar contents to the tmp jar created.
848   * @param my_class        the class to find.
849   * @param fs              the FileSystem with which to qualify the returned path.
850   * @param packagedClasses a map of class name to path.
851   * @return a jar file that contains the class.
852   */
853  private static Path findOrCreateJar(Class<?> my_class, FileSystem fs,
854    Map<String, String> packagedClasses) throws IOException {
855    // attempt to locate an existing jar for the class.
856    String jar = findContainingJar(my_class, packagedClasses);
857    if (null == jar || jar.isEmpty()) {
858      jar = getJar(my_class);
859      updateMap(jar, packagedClasses);
860    }
861
862    if (null == jar || jar.isEmpty()) {
863      return null;
864    }
865
866    LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar));
867    return new Path(jar).makeQualified(fs.getUri(), fs.getWorkingDirectory());
868  }
869
870  /**
871   * Add entries to <code>packagedClasses</code> corresponding to class files contained in
872   * <code>jar</code>.
873   * @param jar             The jar who's content to list.
874   * @param packagedClasses map[class -> jar]
875   */
876  private static void updateMap(String jar, Map<String, String> packagedClasses)
877    throws IOException {
878    if (null == jar || jar.isEmpty()) {
879      return;
880    }
881    ZipFile zip = null;
882    try {
883      zip = new ZipFile(jar);
884      for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) {
885        ZipEntry entry = iter.nextElement();
886        if (entry.getName().endsWith("class")) {
887          packagedClasses.put(entry.getName(), jar);
888        }
889      }
890    } finally {
891      if (null != zip) zip.close();
892    }
893  }
894
895  /**
896   * Find a jar that contains a class of the same name, if any. It will return a jar file, even if
897   * that is not the first thing on the class path that has a class with the same name. Looks first
898   * on the classpath and then in the <code>packagedClasses</code> map.
899   * @param my_class the class to find.
900   * @return a jar file that contains the class, or null.
901   */
902  private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses)
903    throws IOException {
904    ClassLoader loader = my_class.getClassLoader();
905
906    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
907
908    if (loader != null) {
909      // first search the classpath
910      for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
911        URL url = itr.nextElement();
912        if ("jar".equals(url.getProtocol())) {
913          String toReturn = url.getPath();
914          if (toReturn.startsWith("file:")) {
915            toReturn = toReturn.substring("file:".length());
916          }
917          // URLDecoder is a misnamed class, since it actually decodes
918          // x-www-form-urlencoded MIME type rather than actual
919          // URL encoding (which the file path has). Therefore it would
920          // decode +s to ' 's which is incorrect (spaces are actually
921          // either unencoded or encoded as "%20"). Replace +s first, so
922          // that they are kept sacred during the decoding process.
923          toReturn = toReturn.replaceAll("\\+", "%2B");
924          toReturn = URLDecoder.decode(toReturn, "UTF-8");
925          return toReturn.replaceAll("!.*$", "");
926        }
927      }
928    }
929
930    // now look in any jars we've packaged using JarFinder. Returns null when
931    // no jar is found.
932    return packagedClasses.get(class_file);
933  }
934
935  /**
936   * Invoke 'getJar' on a custom JarFinder implementation. Useful for some job configuration
937   * contexts (HBASE-8140) and also for testing on MRv2. check if we have HADOOP-9426.
938   * @param my_class the class to find.
939   * @return a jar file that contains the class, or null.
940   */
941  private static String getJar(Class<?> my_class) {
942    String ret = null;
943    try {
944      ret = JarFinder.getJar(my_class);
945    } catch (Exception e) {
946      // toss all other exceptions, related to reflection failure
947      throw new RuntimeException("getJar invocation failed.", e);
948    }
949
950    return ret;
951  }
952
953  private static int getRegionCount(Configuration conf, TableName tableName) throws IOException {
954    try (Connection conn = ConnectionFactory.createConnection(conf);
955      RegionLocator locator = conn.getRegionLocator(tableName)) {
956      return locator.getAllRegionLocations().size();
957    }
958  }
959}